# SPARK : READ, TRANSFORM & PROCESS @ SCALE

Students' Names : Sarah Gutierez & Adrien Houpert

##  pyspark official documentation

Based on the slides of the course and the spark documentation available online, for each cell, write the spark code that will answer the instructions.

You have to follow the tutorials and search over internet in order to get hand's on experience with pyspark, the notebook gives you some suggestions but you are free to try new things and explore the possibilities offered pyspark possibilities.

List of tutorial to follow :

    - The pyspark API quickstart : https://spark.apache.org/docs/latest/api/python/getting_started/index.html
        Please be sure to read the DataFrame API before going further.
    - The Spark SQL guide : https://spark.apache.org/docs/latest/sql-getting-started.html
        This is the most complete guide on how to use Spark SQL. Be sur to do the getting started and the data sources part

Important : Create cells to demonstrate the code you write.


In [1]:
! pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
from pyspark.sql import SparkSession
import pandas as pd

##  pyspark basics

In [3]:
spark_application_name = "Spark_Application_Name"

In [4]:
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

22/05/12 16:47:54 WARN Utils: Your hostname, Unikarah resolves to a loopback address: 127.0.1.1; using 192.168.1.96 instead (on interface wlo1)
22/05/12 16:47:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/12 16:47:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Using the createDataFrame function, create a spark dataframe with the values [("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)] and the columns ["name", "age"], give this dataframe the name data_df

In [8]:

data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)], ["name", "age"])


Write the line of code that will show the schema of data_df

In [9]:
data_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



Write the line of code that will display 3 values of the dataframe data_df

In [10]:
data_df.head(3)

                                                                                

[Row(name='Brooke', age=20),
 Row(name='Denny', age=31),
 Row(name='Jules', age=30)]

Write the line of code that will return a dataframe with 5 values of the data_df dataframe

In [13]:
data_df_5 = data_df.head(5)
data_df_5

[Row(name='Brooke', age=20),
 Row(name='Denny', age=31),
 Row(name='Jules', age=30),
 Row(name='TD', age=35),
 Row(name='Brooke', age=25)]

### Spark gives the possibility to read a wide range of data sources by natively supporting the read of a huge number of file types and databases

In the following, write the line of code that will allow you to read the json file specified in the json_file_path variable

In [19]:
json_file_path = "data/devices.json"

In [20]:
spark.read.json(json_file_path)

/home/unikarah/Documents/ING2/BigData


Write the line of code that will allow you to read a csv file specified in the csv_file_path variable

In [63]:
csv_file_path = "data/devices.csv"

In [64]:
spark.read.csv(csv_file_path)

DataFrame[_c0: string]

#### Function 1 : read_disp_info()
Write a function, named read_disp_info, which takes as input a file path (json or csv path) and give the following information : It creates a dataframe from the file path, display it's schema, display the 10 first line of the dataframe and give the total number of rows of the dataframe
It is important to understand that this function should be able to read the json and csv files, to do this, it can look at the end of filepath string in order to deduce if the extension of the file is csv or json and then call the reading function accordinly to the extension. At the end, this function return a spark dataframe object representing the read data. 


In [112]:
def read_disp_info(file):
    _, extension = os.path.splitext(file)
    if extension == '.csv':
        df = spark.read.csv(file)
    elif extension == '.json':
        df = spark.read.json(file)
    else:
        raise Exception('Wrong file: csv or json needed')
    df.printSchema()
    print(df.head(10))
    print(f"\nNumber of rows {df.count()}")
    return df

Using this function, read the file present in "/home/adminefrei/Documents/data/devices.json" and affect the retuned dataframe to the variable devDF

In [113]:
devDF = read_disp_info("data/devices.json")

root
 |-- dev_type: string (nullable = true)
 |-- devnum: long (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- release_dt: string (nullable = true)

[Row(dev_type='phone', devnum=1, make='Sorrento', model='F00L', release_dt='2008-10-21T00:00:00.000-07:00'), Row(dev_type='phone', devnum=2, make='Titanic', model='2100', release_dt='2010-04-19T00:00:00.000-07:00'), Row(dev_type='phone', devnum=3, make='MeeToo', model='3.0', release_dt='2011-02-18T00:00:00.000-08:00'), Row(dev_type='phone', devnum=4, make='MeeToo', model='3.1', release_dt='2011-09-21T00:00:00.000-07:00'), Row(dev_type='phone', devnum=5, make='iFruit', model='1', release_dt='2008-10-21T00:00:00.000-07:00'), Row(dev_type='phone', devnum=6, make='iFruit', model='3', release_dt='2011-11-02T00:00:00.000-07:00'), Row(dev_type='phone', devnum=7, make='iFruit', model='2', release_dt='2010-05-20T00:00:00.000-07:00'), Row(dev_type='phone', devnum=8, make='iFruit', model='5', release_dt

DataFrame transformations typically return another DataFrame. Try using a select to return a DataFrame with only the make and model columns, name this dataframe makeModelDF

In [95]:
makeModelDF = devDF.select('make', 'model')

Display the schema of the makeModelDF, what can you deduce ?

In [96]:
makeModelDF.printSchema()

root
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)



Transformations in a query can be chained together. Write the line of code that will run a query using select the columns devnum, make, and model and where the make is equal to "Ronin"

In [97]:
devDF.select("devnum", "make", "model").where(devDF.make == "Ronin").show()

+------+-----+--------------+
|devnum| make|         model|
+------+-----+--------------+
|    15|Ronin|Novelty Note 1|
|    17|Ronin|Novelty Note 3|
|    18|Ronin|Novelty Note 2|
|    19|Ronin|Novelty Note 4|
|    46|Ronin|            S4|
|    47|Ronin|            S1|
|    48|Ronin|            S3|
|    49|Ronin|            S2|
+------+-----+--------------+



Spark can also read parquet files, parquet is one of the most used format in hdfs, search in the doc and write the line of the code that will let you read the parquet file present in parquet_path

In [98]:
parquet_path = "data/base_stations.parquet"

In [99]:
stations_df =  spark.read.parquet(parquet_path)
stations_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- zip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



Again, select the id, lat and lon columns where the city = Scottsdale from this dataframe

In [100]:
stations_df.select('id', 'lat', 'lon').where(stations_df.city == 'Scottsdale').show()

+---+-------+---------+
| id|    lat|      lon|
+---+-------+---------+
| 14|33.6165|-111.9554|
| 15|33.6968|-111.8892|
+---+-------+---------+



### Schema Definition of a DataFrame

Create a new DataFrame based on the devices.json file.

In [101]:
df = spark.read.json("data/devices.json")

View the schema of the devDF DataFrame. Note the column names and types that Spark inferred from the JSON file. In particular, note that the release_dt column is of type string, whereas the data in the column actually represents a timestamp

In [102]:
df.printSchema()

root
 |-- dev_type: string (nullable = true)
 |-- devnum: long (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- release_dt: string (nullable = true)



Defining a schema that correctly specifies the column types for this DataFrame starts by importing the package with the definitions of necessary classes and types

In [103]:
from pyspark.sql.types import *

Next, we create a collection of StructField objects, which represent column definitions. The release_dt column will be a timestamp

In [104]:
devColumns = [StructField("devnum",LongType()), StructField("make",StringType()), StructField("model",StringType()), StructField("release_dt",TimestampType()),StructField("dev_type",StringType())]

Create a schema (a StructType object) using the column definition list.

In [105]:
devSchema = StructType(devColumns)

Note that you have created the devSchema, search in the doc how you can specify the schema when you create the dataframe from the json_file_path ("/path/to/devices.json")

In [106]:
dev_df = spark.read.schema(devSchema).json("data/devices.json")

View the schema and data of the new DataFrame, and confirm that the release_dt column type is now timestamp

In [107]:
dev_df.printSchema()

root
 |-- devnum: long (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- release_dt: timestamp (nullable = true)
 |-- dev_type: string (nullable = true)



Write the line of code that will read the account_device_path correctly, search how to set up the delimiter and consider the header when reading the csv file

In [108]:
account_device_path = "data/accountdevice/part-00000-f3b62dad-1054-4b2e-81fd-26e54c2ae76a.csv"

In [109]:
spark.read.option("delimiter", ",").option("header", True).csv(account_device_path).show()

+-----+----------+---------+---------------+--------------------+
|   id|account_id|device_id|activation_date|   account_device_id|
+-----+----------+---------+---------------+--------------------+
|48692|     32443|       29|  1393242509000|7351fed1-f344-4cd...|
|48693|     32444|        4|  1353649861000|6da22278-ff7a-461...|
|48694|     32445|        9|  1331819465000|cb993b85-6775-407...|
|48695|     32446|       43|  1336860950000|48ea2c09-a0df-4d1...|
|48696|     32446|       29|  1383650663000|4b49c0a6-d141-42e...|
|48697|     32447|        6|  1342578469000|cc8e8361-3d67-4be...|
|48698|     32447|       29|  1386643231000|b40dba90-b073-405...|
|48699|     32448|        5|  1350883104000|f088d30f-1e1c-47f...|
|48700|     32448|       29|  1383321310000|2805df93-2e89-433...|
|48701|     32449|       34|  1333225574000|e0e7edbe-77fc-421...|
|48702|     32450|       13|  1341361160000|b3ce579f-48e3-43e...|
|48703|     32451|       21|  1329524457000|857bbe4f-028c-475...|
|48704|   

Update the function read_disp_info(), and add to it two optional parameters (header and delimiter) those two optional paramaters can be used optionnally and have the default values header = False and delimiter = ";". 

In [110]:
def updated_read_disp_info(file, header=False, delimiter=';'):
    _, extension = os.path.splitext(file)
    print(extension)
    if extension == '.csv':
        df = spark.read.option("delimiter", delimiter).option("header", header).csv(file)
    elif extension == '.json':
        df = spark.read.json(file)
    else:
        raise Exception('Wrong file: csv or json needed')
    df.printSchema()
    print(df.head(10))
    print(f"\nNumber of rows {df.count()}")
    return df

Now, using the function defined, test it to read the account_device_path file

In [111]:
updated_read_disp_info(account_device_path, True, ",").show()

.csv
root
 |-- id: string (nullable = true)
 |-- account_id: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- activation_date: string (nullable = true)
 |-- account_device_id: string (nullable = true)

[Row(id='48692', account_id='32443', device_id='29', activation_date='1393242509000', account_device_id='7351fed1-f344-4cdc-91b7-509ca3e0aee6'), Row(id='48693', account_id='32444', device_id='4', activation_date='1353649861000', account_device_id='6da22278-ff7a-4618-bc93-601207bedff3'), Row(id='48694', account_id='32445', device_id='9', activation_date='1331819465000', account_device_id='cb993b85-6775-4071-8292-3371e173b797'), Row(id='48695', account_id='32446', device_id='43', activation_date='1336860950000', account_device_id='48ea2c09-a0df-4d17-b1da-06a989f4270c'), Row(id='48696', account_id='32446', device_id='29', activation_date='1383650663000', account_device_id='4b49c0a6-d141-42e6-b3d2-767338bfbece'), Row(id='48697', account_id='32447', device_id='6', activa