# Ввод и вывод в Spark: работа с файлами

Стандартная "шапка" - настройка окружения на работу со spark

**Важно** обратите внимание - SPARK_HOME указывает на путь, куда установлен Spark на локальном компютере (поправьте под себя) 

In [3]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"

import findspark
findspark.find()
findspark.init()

from pyspark.sql import SparkSession

# os.environ["SPARK_HOME"] = "/home/mk/mk_win/projects/SparkEdu/lib/python3.5/site-packages/pyspark"
# os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
# os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

Создаем сессию, работаем в "локальном" режиме.

In [4]:
master = "local"
#master = "yarn" В кластере
spark = SparkSession.builder.master(master).appName("spark_test").getOrCreate()

Загрузим наш файл со странами мира, на забываем - первая строка, заголовок (опция `header`)

In [8]:
# цепочка обработки при чтении

df = spark.read.format("csv") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("path", "./countries_of_the_world.xls") \
    .load()

Покажем первые 3 строки

In [9]:
df.show(3)

+------------+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|     Country|              Region|Population|Area (sq. mi.)|Pop. Density (per sq. mi.)|Coastline (coast/area ratio)|Net migration|Infant mortality (per 1000 births)|GDP ($ per capita)|Literacy (%)|Phones (per 1000)|Arable (%)|Crops (%)|Other (%)|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+------------+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|Afghanistan |ASIA (EX. NEAR EA...|  31056997|        647500|                      48

Посмотреть схему файла можно с помощью метода `printSchema()`

In [10]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Area (sq. mi.): integer (nullable = true)
 |-- Pop. Density (per sq. mi.): string (nullable = true)
 |-- Coastline (coast/area ratio): string (nullable = true)
 |-- Net migration: string (nullable = true)
 |-- Infant mortality (per 1000 births): string (nullable = true)
 |-- GDP ($ per capita): integer (nullable = true)
 |-- Literacy (%): string (nullable = true)
 |-- Phones (per 1000): string (nullable = true)
 |-- Arable (%): string (nullable = true)
 |-- Crops (%): string (nullable = true)
 |-- Other (%): string (nullable = true)
 |-- Climate: string (nullable = true)
 |-- Birthrate: string (nullable = true)
 |-- Deathrate: string (nullable = true)
 |-- Agriculture: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Service: string (nullable = true)



Загрузка JSON файла (столицы) происходит полностью аналогично, меняются только формат файла и, возможно, набор опций (зависит от специфики файла).

In [0]:
# загрузим JSON
dfj = spark.read.format("json") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("path", "./capital.json") \
    .load()
dfj.show(3)

+----------------+---------+-----+----------+----------+------+-------+------+---+------------+---------+------+--------+----------+---------+----+--------+----------+-----+--------+-----------+-----+------+---------+----------+--------+--------+-------------------+-----+---+--------+------+-------+---+--------+-----+--------+------+-----------+--------+------+-----------+-----+------------+------+--------+-------+-------+------+--------+------+-----+-----------+----------------+-------+------+------+--------+----------+------+-------------+-------+-----+-------+-----+--------+------+------+-----------+--------+----+-------+-------+--------+-----+----------+------+------------+-------+-------+-------------+-----+---------+----+------+-------+-----------+------+------+---------+--------------+-------+------+----------+---------+---+-----------+------+--------------+--------+-------+------+---------+--------------------+---------+------------+-------+------+---------+----+------------+--

Запись в файлы происходит так же просто (и универсально - нужно лишь указать формат результирующего файла).

In [0]:
# цепочка обработки при записи
df.write.format("csv") \
    .mode("overwrite") \
    .option("sep", "\t") \
    .save("./new.csv")

In [0]:
# сохраним в виде JSON
df.write.format("json") \
    .mode("overwrite") \
    .save("./new.json")

In [0]:
spark.stop()