# PySpark Lecture for Homework 2

## Creating a session


This notebook uses the `findspark` package to start *Spark*. 

This method assumes you have a local version of *Spark*, and have set the environment variable `SPARK_HOME` to it, and have an environment `path` entry to SPARK_HOME\bin 

To install: `pip install findspark`

`
import findspark
findspark.init()
`

#### Or the following command

`findspark.init("/path/to/spark_home")`

#### The version of java should be 1.8.



### Alternatively, you could use Python-only spark:
`pip install pyspark`



In [1]:
import findspark
findspark.init()

# Or the following command
#   findspark.init("/path/to/spark_home")



### Spark Session

Our main entry point into Spark, Dataframes and Spark SQL package


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("HW2")\
        .getOrCreate()

spark

### Get the SparkContext from the current session


In [3]:
sc = spark.sparkContext
sc

 # Complex Datatypes 

There are 3 complex Types in Spark,

- Array
- Struct
- Map

### Array

An Array in spark consists of a list of homogenous elements (i.e) elements of the same datatype together.

In [4]:
input_json = """
{
  "numbers": [1, 2, 3, 4, 5, 6]
}
"""
adf = spark.read.json(sc.parallelize([input_json]))
adf.printSchema()

root
 |-- numbers: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [5]:
# Let’s have a look at the data
adf.show(truncate=False)

+------------------+
|numbers           |
+------------------+
|[1, 2, 3, 4, 5, 6]|
+------------------+



#### Flatten the Array using Explode

Now, what if you wish to display the elements in a more structured form with the elements present in individual rows.
Now here comes the usage of the “explode” function. The explode, as the name suggests breaks the array into rows containing one element each. Below is a simple usage of the explode function, to explode this array.

In [6]:
from pyspark.sql.functions import explode
adf.select(explode('numbers').alias('number')).show()

+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
+------+



### Struct

Struct data type is grouped list of variables which can be accessed via a single parent pointer.
The elements inside a struct type can be accessed via the dot “.” notation as discussed below,

In [7]:
input_json = """
{
  "car_details": {
     "model": "Tesla S",
     "year": 2018
  }
}
"""
sdf = spark.read.json(sc.parallelize([input_json]))
sdf.printSchema()

root
 |-- car_details: struct (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- year: long (nullable = true)



In [8]:
# This is how the data looks.
sdf.show()

+---------------+
|    car_details|
+---------------+
|[Tesla S, 2018]|
+---------------+



### Map

Map is an element consisting of a key value pair. It is similar to a dictionary in python.
Let’s see how map elements can be accessed from a JSON record,

In [9]:
from pyspark.sql.types import StructType, MapType, StringType, IntegerType
input_json = """
{
  "Car": {
    "model_id": 835,
    "year": 2008
  }
}
"""
schema = StructType().add("Car", MapType(StringType(), IntegerType()))
mdf = spark.read.json(sc.parallelize([input_json]), schema=schema)
mdf.printSchema()

root
 |-- Car: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)



In [10]:
# This is how the data looks when displayed,
mdf.show(truncate=False)

+-------------------------------+
|Car                            |
+-------------------------------+
|[model_id -> 835, year -> 2008]|
+-------------------------------+



Accessing elements individually can also be done using a dictionary type access in python, as shown below,

In [11]:
mdf.select(mdf.Car['model_id'], mdf.Car['year']).show()

+-------------+---------+
|Car[model_id]|Car[year]|
+-------------+---------+
|          835|     2008|
+-------------+---------+



# Loading data

In [12]:
# import some packages from the spark.sql namespace to deal with data types and schemas

from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In the code below, **change** the paths to suit your own environment.
The datafiles in this homework are provided in the resources section of *NYUClasses*.

### CSV

### Load the Kaggle dataset `Transactions from a bakery`

Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame

In [13]:
# OPTION 1 - infer schema
# -----------------------
df_bakery = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("D:\school\data\BreadBasket_DMS.csv")

df_bakery.printSchema()
df_bakery.show(10)
df_bakery.count()

root
 |-- Date: timestamp (nullable = true)
 |-- Time: string (nullable = true)
 |-- Transaction: integer (nullable = true)
 |-- Item: string (nullable = true)

+-------------------+--------+-----------+-------------+
|               Date|    Time|Transaction|         Item|
+-------------------+--------+-----------+-------------+
|2016-10-30 00:00:00|09:58:11|          1|        Bread|
|2016-10-30 00:00:00|10:05:34|          2| Scandinavian|
|2016-10-30 00:00:00|10:05:34|          2| Scandinavian|
|2016-10-30 00:00:00|10:07:57|          3|Hot chocolate|
|2016-10-30 00:00:00|10:07:57|          3|          Jam|
|2016-10-30 00:00:00|10:07:57|          3|      Cookies|
|2016-10-30 00:00:00|10:08:41|          4|       Muffin|
|2016-10-30 00:00:00|10:13:03|          5|       Coffee|
|2016-10-30 00:00:00|10:13:03|          5|       Pastry|
|2016-10-30 00:00:00|10:13:03|          5|        Bread|
+-------------------+--------+-----------+-------------+
only showing top 10 rows



21293

In [14]:
# OPTION 2 - define/force a schema 
# --------------------------------
bakery_schema = StructType([
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])

df_bakery1 = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("D:\school\data\BreadBasket_DMS.csv", schema=bakery_schema)

df_bakery1.printSchema()
df_bakery1.show(10)
df_bakery1.count()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- transaction: integer (nullable = true)
 |-- item: string (nullable = true)

+----------+--------+-----------+-------------+
|      date|    time|transaction|         item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
+----------+--------+-----------+-------------+
only showing top 10 rows



21293

In [15]:
# explore some of the data
itemNames = df_bakery1.select("item").distinct()

#itemNames.orderBy("item").show(itemNames.count())
itemNames.orderBy("item").show(20)
itemNames.count()


+--------------------+
|                item|
+--------------------+
|          Adjustment|
|Afternoon with th...|
|           Alfajores|
|     Argentina Night|
|            Art Tray|
|               Bacon|
|            Baguette|
|            Bakewell|
|        Bare Popcorn|
|              Basket|
|       Bowl Nic Pitt|
|               Bread|
|       Bread Pudding|
|  Brioche and salami|
|             Brownie|
|                Cake|
|       Caramel bites|
|Cherry me Dried f...|
|        Chicken Stew|
|        Chicken sand|
+--------------------+
only showing top 20 rows



95

### JSON

In [16]:
# we can also load JSON data

restaurants = spark.read.json("D:\school\data\Restaurants_in_Durham_County_NC.json")
restaurants.printSchema()
restaurants.show(5)



root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

In [17]:
# let's examine it closer
# notice how the inner 'fields' is defined as a structure, and geolocation is an array inside the 
# inner structure
#

# we can access inner structures with the dot operator
fields = restaurants.select("recordid","fields.id","fields.geolocation","fields.opening_date")
fields.printSchema()
fields.show(5)

# how do we access the inner array for geolocation?

newFields = fields \
  .select("recordid","id", \
    fields["geolocation"].getItem(0).alias("latitude"),  \
    fields["geolocation"].getItem(1).alias("longitude"))
newFields.show(5)


root
 |-- recordid: string (nullable = true)
 |-- id: string (nullable = true)
 |-- geolocation: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- opening_date: string (nullable = true)

+--------------------+-----+--------------------+------------+
|            recordid|   id|         geolocation|opening_date|
+--------------------+-----+--------------------+------------+
|1644654b953d1802c...|56060|[35.9207272, -78....|  1994-09-01|
|93573dbf8c9e799d8...|58123|[36.0467802, -78....|  2003-10-15|
|0d274200c7cef50d0...|70266|[35.9182655, -78....|  2009-07-09|
|cf3e0b175a6ebad2a...|97837|[36.0183378, -78....|  2012-01-09|
|e796570677f7c39cc...|60690|[36.0556347, -78....|  2008-06-02|
+--------------------+-----+--------------------+------------+
only showing top 5 rows

+--------------------+-----+----------+-----------+
|            recordid|   id|  latitude|  longitude|
+--------------------+-----+----------+-----------+
|1644654b953d1802c...|56060|35.9207272

## Working in SQL or DataFrames

## Transformations


### JOINS


In [18]:
# join on a common column(s)

newDf = restaurants.join(newFields, restaurants['recordid'] == newFields['recordid'])
newDf.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

Note that in Spark, the join key gets duplicated. 
To avoid, use a different format:

`df1.join(df2, [fields...], 'TYPE-OF-JOIN`)

where TYPE-OF-JOIN can be

- left
- right
- inner
- fullouter

For example: 

In [19]:
newDf = restaurants.join(newFields, ['recordid'], 'inner')
newDf.printSchema()

root
 |-- recordid: string (nullable = true)
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-

### SELECTS

In [20]:
from pyspark.sql.functions import col, asc, desc

# select some columns
df = df_bakery.select("date", "item")
df.show(5)


+-------------------+-------------+
|               date|         item|
+-------------------+-------------+
|2016-10-30 00:00:00|        Bread|
|2016-10-30 00:00:00| Scandinavian|
|2016-10-30 00:00:00| Scandinavian|
|2016-10-30 00:00:00|Hot chocolate|
|2016-10-30 00:00:00|          Jam|
+-------------------+-------------+
only showing top 5 rows



In [21]:
# filter some rows; many ways
# sort, group by, others


# filter; same way to specify columns
df2 = df_bakery1.filter(df_bakery1.item != 'NONE')
df2.show(5)

count = df_bakery1.filter(col("item") != "NONE").count()
display(count)


# these are equal statements
display(count == df_bakery1.filter(df_bakery1.item != 'NONE').count())

# another way to filter
df_bakery \
  .where(col("item") != "NONE") \
  .where(col("date") == "2016-10-30") \
  .count()

+----------+--------+-----------+-------------+
|      date|    time|transaction|         item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
+----------+--------+-----------+-------------+
only showing top 5 rows



20507

True

170

In [22]:
from pyspark.sql.functions import asc, desc

# sort ascending
df2.orderBy(asc("item")).show(5)

# sort multiples, ascending, then descending
df2.orderBy(asc("item"),desc("transaction")).show(5)

# group, aggregate
df2.groupBy("item").sum("transaction").show(5)

# renaming; two ways...
df2.groupBy("item").sum("transaction").withColumnRenamed("sum(transaction)","total").show(5)
df2.groupBy("item").sum("transaction").toDF("item", "total").show(5)


+----------+--------+-----------+--------------------+
|      date|    time|transaction|                item|
+----------+--------+-----------+--------------------+
|2016-11-09|19:49:22|        938|          Adjustment|
|2017-01-11|14:51:56|       4541|Afternoon with th...|
|2017-01-07|13:19:07|       4322|Afternoon with th...|
|2017-01-06|14:17:59|       4263|Afternoon with th...|
|2017-01-08|15:28:34|       4405|Afternoon with th...|
+----------+--------+-----------+--------------------+
only showing top 5 rows

+----------+--------+-----------+--------------------+
|      date|    time|transaction|                item|
+----------+--------+-----------+--------------------+
|2016-11-09|19:49:22|        938|          Adjustment|
|2017-04-08|10:44:44|       9579|Afternoon with th...|
|2017-04-08|10:43:56|       9578|Afternoon with th...|
|2017-03-19|18:33:00|       8455|Afternoon with th...|
|2017-03-18|10:45:13|       8342|Afternoon with th...|
+----------+--------+-----------+-------

In [23]:
# first, need to tell Spark we want to register a dataframe as a SQL table or view
#
#  two ways: as a table or as a view. There are subtle differences outside the scope of this Hw...
#    .registerTempTable("bakery")
#    .createOrReplaceTempView("v_bakery")
#    
#   
df_bakery1.registerTempTable("bakery")

# filter, again but in SQL
df2 = spark.sql("SELECT date, transaction, item " +
                       "FROM bakery " +
                       "WHERE item NOT LIKE 'NONE'" +
                       "ORDER BY transaction")
df2.show(5)
df2.registerTempTable("bakery2")
df2.count()


+----------+-----------+-------------+
|      date|transaction|         item|
+----------+-----------+-------------+
|2016-10-30|          1|        Bread|
|2016-10-30|          2| Scandinavian|
|2016-10-30|          2| Scandinavian|
|2016-10-30|          3|Hot chocolate|
|2016-10-30|          3|          Jam|
+----------+-----------+-------------+
only showing top 5 rows



20507

In [24]:
# perform aggregate (count) by date, after filtering for dates
df3 = spark.sql("SELECT date, count(*) as count " +
                       "FROM bakery2 " +
                       "WHERE date >= '2017-01-01' " +
                       "GROUP BY date " +
                       "ORDER BY date")

df3.printSchema()
df3.show(5)
df3.count()


root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)

+----------+-----+
|      date|count|
+----------+-----+
|2017-01-01|    1|
|2017-01-03|   87|
|2017-01-04|   76|
|2017-01-05|   95|
|2017-01-06|   84|
+----------+-----+
only showing top 5 rows



98

### Map and Flatmap



In [25]:
# map example: multiply count by 2
df4 = df3.rdd.map(lambda x: (x[0], x[1]*2)).toDF()
df4.printSchema()
df4.withColumnRenamed("_1","date").show(5)

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+----------+---+
|      date| _2|
+----------+---+
|2017-01-01|  2|
|2017-01-03|174|
|2017-01-04|152|
|2017-01-05|190|
|2017-01-06|168|
+----------+---+
only showing top 5 rows



### flatMap
flatMap is a transformation operation. It applies to each element of RDD and it returns the result as new RDD . It is similar to Map, but FlatMap allows returning 0, 1 or more elements from map function. In the FlatMap operation, a developer can define his own custom business logic

In [26]:
x = sc.parallelize(["spark rdd example", "sample example"], 2)
 
# map operation will return Array of Arrays in following case (check the result)
y = x.map(lambda x: x.split(' '))
print(y.collect())
# [['spark', 'rdd', 'example'], ['sample', 'example']]
 
# flatMap operation will return Array of words in following case (check the result)
y = x.flatMap(lambda x: x.split(' '))
print(y.collect())
# ['spark', 'rdd', 'example', 'sample', 'example']

[['spark', 'rdd', 'example'], ['sample', 'example']]
['spark', 'rdd', 'example', 'sample', 'example']


### Stop the Spark Session

In [27]:
spark.stop()