# Managing input and output of spark jobs

In [1]:
import pyspark
from pyspark.sql import SparkSession

sc = pyspark.SparkContext()
spark = SparkSession(sc)

## Reading data from data sources

Generic structure of the Read API:

spark.read

  .format(..) # specify file format
  
  .option(..) # specify options
  
  .option(..)
  
  .schema(..) # specify schema (if applicable)
  
  .load()
 

In [2]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
manualSchema = StructType([
    # StructField (name, dataType, nullable, metadata)
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False)    
])
spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("mode", "FAILFAST") \
    .schema(manualSchema) \
    .load('../data/flights.csv') \
    .show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



Spark also supports other data formats, like plain text files, CSV, JSON, Parquet, ORC and some others. For example, here is how you can read data from Parquet which is a powerfull columnar based data format

In [3]:
# spark.read.format("parquet").load(...)

Reading data in parallel

In [4]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('../data/parallel_data/*')
df.rdd.getNumPartitions()

3

## Writing data

In [8]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("mode", "FAILFAST") \
    .schema(manualSchema) \
    .load('../data/flights.csv')

# lets do some transformation
df2 = df.filter('count > 150').repartition(4)
df2.write.format("csv").mode('overwrite').save('../output')

In [9]:
df3 = df2.coalesce(1)
df3.write.format('csv').mode('overwrite').partitionBy('DEST_COUNTRY_NAME').save('../output')

In [14]:
numberOfBuckets = 10
bucketCol = 'count'
df3.write.format('csv').mode('overwrite').bucketBy(numberOfBuckets, bucketCol).saveAsTable('bucketed_flights')