# Chapter 9 - Data Sources

### Read
We use the `DataFrameReader` object, available through the `read()` method on `SparkSession`. 
It will return a DataFrame.
It is customizable through options.
As a minimum the path of the file to be read must be provided.

### Write 
We use a `DataFrameWriter` available through the `write` method of a DataFrame. It also accepts options.

## CSV files

In [1]:
from pyspark.sql import SparkSession
import os

# launch spark with sqlite jar for further or querying
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:' + os.path.abspath('..') + '/jar/sqlite-jdbc-3.25.2.jar pyspark-shell'
spark = SparkSession.builder.master("local").appName("chapter9").getOrCreate()

In [2]:
# Method 1: specify the path as argument of load()
csv_df = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("mode", "failFast")\
            .load("../data/flight-data/csv/2011-summary.csv")

In [3]:
# Method 2: specify the path as option and leave load() with no arguments
csv_df = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("mode", "failFast")\
            .option("path", "../data/flight-data/csv/2011-summary.csv")\
            .load()

In [4]:
# add a column and store as new file
from pyspark.sql.functions import lit

new_df = csv_df.withColumn("NewData", lit(1))
new_df.printSchema()

# store in a csv file
new_df.write.format("csv").mode("overwrite").option("sep", "\t").save("../data/custom-data/chapter9-sample.csv")

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- NewData: integer (nullable = false)



## JSON files

In [5]:
json_df = spark.read.format("json")\
        .option("mode", "FAILFAST")\
        .option("inferSchema", "true")\
        .option("path", "../data/flight-data/json/2010-summary.json")\
        .load()
json_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [6]:
from pyspark.sql.functions import col

new_df = json_df.withColumn("twice_count", (col("count").cast("float") * 2).cast("int"))
new_df.show(5)

new_df.write.format("json").mode("overwrite").save("../data/custom-data/chapter9-sample.json")

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|twice_count|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|    1|          2|
|    United States|            Ireland|  264|        528|
|    United States|              India|   69|        138|
|            Egypt|      United States|   24|         48|
|Equatorial Guinea|      United States|    1|          2|
+-----------------+-------------------+-----+-----------+
only showing top 5 rows



## Parquet files

The parquet file is compressed (gz). The current flight data from 2010 weights 3.8K in parquet format and 20K in JSON.

In [7]:
parquet_df = spark.read.format("parquet").load("../data/flight-data/parquet/2010-summary.parquet")
parquet_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [8]:
new_df = parquet_df.withColumn("twice_count", (col("count").cast("float") * 2).cast("int"))
new_df.show(5)

new_df.write.format("parquet").mode("overwrite").save("../data/custom-data/chapter9-sample.parquet")

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|twice_count|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|    1|          2|
|    United States|            Ireland|  264|        528|
|    United States|              India|   69|        138|
|            Egypt|      United States|   24|         48|
|Equatorial Guinea|      United States|    1|          2|
+-----------------+-------------------+-----+-----------+
only showing top 5 rows



## SQL databases

### SQLite files

In [22]:
driver = "org.sqlite.JDBC"
path = "../data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"

dbDF = spark.read.format("jdbc").option("url", url).option('dbtable', tablename).option("driver", driver).load()
dbDF.printSchema()
dbDF.show(5)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: decimal(20,0) (nullable = true)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### Query pushdown
Instead of a specifying a full table, specify a query and load the results in the dataframe

In [23]:
dbDF.filter(col("DEST_COUNTRY_NAME")=="Egypt").show()
dbDF.explain()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+

== Physical Plan ==
*(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#224,ORIGIN_COUNTRY_NAME#225,count#226] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>


In [24]:
pushdown_query = """(SELECT * FROM flight_info WHERE DEST_COUNTRY_NAME='Egypt') AS flight_info"""
pdDF = spark.read.format("jdbc").option("url", url).option('dbtable', pushdown_query).option("driver", driver).load()
pdDF.show()
pdDF.explain()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+

== Physical Plan ==
*(1) Scan JDBCRelation((SELECT * FROM flight_info WHERE DEST_COUNTRY_NAME='Egypt') AS flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#253,ORIGIN_COUNTRY_NAME#254,count#255] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>


### Reading from databases in parallel


In [25]:
dbDF = spark.read.format("jdbc").option("url", url)\
                                .option('dbtable', tablename)\
                                .option("driver", driver)\
                                .option("numPartitions", 10)\
                                .load()


In [26]:
# specifying predicates, that would create two partitions one for each predicate
props = {"driver":"org.sqlite.JDBC"}
predicates = ["DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
              "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Sweden|      United States|   65|
|    United States|             Sweden|   73|
|         Anguilla|      United States|   21|
|    United States|           Anguilla|   20|
+-----------------+-------------------+-----+



In [28]:
# if the predicates are not disjoint, we get lots of rows
props = {"driver":"org.sqlite.JDBC"}
predicates = ["DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
              "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).count()

510

In [31]:
from pyspark.sql.functions import max
dbDF.select(max("count")).show()

+----------+
|max(count)|
+----------+
|    348113|
+----------+



In [37]:
colName = "count"
lowerBound = 0
upperBound = 348113
numPartitions = 10

spark.read.jdbc(url, tablename, column=colName, 
                lowerBound=lowerBound, upperBound=upperBound, 
                numPartitions=numPartitions, 
                properties=props).count()

255

### Writing to SQL databases

In [38]:
new_url = url = "jdbc:sqlite:" + os.path.abspath('..') + '/data/custom-data/chapter9-sample.db'
csv_df.write.jdbc(new_url, tablename, mode="overwrite", properties=props)
# inspect the results
spark.read.jdbc(new_url, tablename, properties=props).count()

In [40]:
# appending to the new db
csv_df.write.jdbc(new_url, tablename, mode="append", properties=props)
spark.read.jdbc(new_url, tablename, properties=props).count()

510

## Text files

In [43]:
spark.read.text("../data/flight-data/csv/2011-summary.csv").selectExpr("split(value, ',') as rows").show()

+--------------------+
|                rows|
+--------------------+
|[DEST_COUNTRY_NAM...|
|[United States, S...|
|[United States, G...|
|[United States, C...|
|[United States, R...|
|[United States, I...|
|[Egypt, United St...|
|[United States, I...|
|[United States, S...|
|[United States, G...|
|[Costa Rica, Unit...|
|[Senegal, United ...|
|[Guyana, United S...|
|[United States, M...|
|[United States, S...|
|[Malta, United St...|
|[Bolivia, United ...|
|[Anguilla, United...|
|[United States, P...|
|[United States, G...|
+--------------------+
only showing top 20 rows



In [44]:
csv_df.select("DEST_COUNTRY_NAME").write.text("../data/custom-data/chapter9-sample.txt")

In [45]:
# by performing partitioning you can write more columns, as directories
csv_df.limit(10).select("DEST_COUNTRY_NAME", "count").write.partitionBy("count")\
                .text("../data/custom-data/chapter9-sample-partitioned.txt")