# Spark 101: Reading and Writing Data with Spark

This notebook is inspired by https://www.udacity.com/course/learn-spark-at-udacity--ud2002 and is an attempt to better understand PySpark.


In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import ArrayType, BooleanType, LongType, FloatType, IntegerType
from pyspark.sql.functions import lit, udf, struct, countDistinct, collect_list, avg, count, col
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
from sklearn.metrics import precision_recall_curve
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Creating an RDD

There are few ways to create RDDs:
   1) distributing a set of collection objects
   2) loading an external dataset

### 1) distributing a set of collection objects using ***parallelize()***

In [22]:
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.sparkContext.parallelize(
    [(1, 2, 3, 'a b c'), # row 1
     (4, 5, 6, 'd e f'), 
     (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])

This gives us the following RDD:

In [12]:
df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [14]:
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])

In [15]:
myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

### 2) distributing a set of collection objects using ***createDataFrame()***

In [18]:
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Employee = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId'])

In [21]:
Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



### 3) Reading dataset from a file

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify_Project") \
    .getOrCreate()

In [3]:
df = "mini_sparkify_event_data.json"
df = spark.read.json(df)

In [4]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



There are a few other useful methods to read files from SQL databases and from HDFS. For further information https://runawayhorse001.github.io/LearningApacheSpark/rdd.html

Lets use the describe method to see what we can learn form the data

In [5]:
df.describe()

DataFrame[summary: string, artist: string, auth: string, firstName: string, gender: string, itemInSession: string, lastName: string, length: string, level: string, location: string, method: string, page: string, registration: string, sessionId: string, song: string, status: string, ts: string, userAgent: string, userId: string]

Describe returns the columns in the dataset and their respective types. We can also look at particular records, lets see what the first one looks like.

In [6]:
df.show(n=1)

+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|        artist|     auth|firstName|gender|itemInSession|lastName|   length|level|       location|method|    page| registration|sessionId|     song|status|           ts|           userAgent|userId|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|Bakersfield, CA|   PUT|NextSong|1538173362000|       29|Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
only showi

Now that we have successfully loaded the DataFrame, let see how we can save it into a different format such as CSV format. 

In [34]:
out_path = "mini_sparkify_event_data.csv"

In [35]:
df.write.save(out_path,format = "csv", header = True)

Now lets load the CSV file which we just saved into a DataFrame:

In [36]:
df_csv = spark.read.csv(out_path, header = True)

Now lets check Schema of the CSV file

In [37]:
df_csv.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: string (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



It is exactly the same as before.
We can also take a look at the first few records.

In [39]:
df_csv.show(n=1)

+----------+---------+---------+------+-------------+--------+--------+-----+---------------+------+--------+-------------+---------+---------------+------+-------------+--------------------+------+
|    artist|     auth|firstName|gender|itemInSession|lastName|  length|level|       location|method|    page| registration|sessionId|           song|status|           ts|           userAgent|userId|
+----------+---------+---------+------+-------------+--------+--------+-----+---------------+------+--------+-------------+---------+---------------+------+-------------+--------------------+------+
|Duncan Dhu|Logged In|  Ashlynn|     F|           36|Williams|172.9824| paid|Tallahassee, FL|   PUT|NextSong|1537365219000|     1833|Reina De Africa|   200|1542041667000|"Mozilla/5.0 (Mac...|    74|
+----------+---------+---------+------+-------------+--------+--------+-----+---------------+------+--------+-------------+---------+---------------+------+-------------+--------------------+------+
only 

In [42]:
df_csv.take(1)

[Row(artist='Duncan Dhu', auth='Logged In', firstName='Ashlynn', gender='F', itemInSession='36', lastName='Williams', length='172.9824', level='paid', location='Tallahassee, FL', method='PUT', page='NextSong', registration='1537365219000', sessionId='1833', song='Reina De Africa', status='200', ts='1542041667000', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', userId='74')]

In [40]:
df_csv.select("userID").show()

+------+
|userID|
+------+
|    74|
|    62|
|     6|
|    29|
|    85|
|   132|
|    35|
|    35|
|    74|
|    62|
|    92|
|    37|
|   112|
|   112|
|    36|
|    29|
|     6|
|    35|
|   132|
|    62|
+------+
only showing top 20 rows

