# Manipulate S3 data with PySpark SQL on `AWS EMR`
  
> [PySpark SQL API Documentation](<https://spark.apache.org/docs/latest/api/python/pyspark.sql.html>)

In [1]:
%%spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1585429618764_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## import S3 data without inferring its schema

In [2]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)

In [4]:
df.show(n=5,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------------------------+
|_c0                                                          |
+-------------------------------------------------------------+
|payment_id;customer_id;staff_id;rental_id;amount;payment_date|
|16050;269;2;7;1.99;2017-01-24 21:40:19.996577+00             |
|16051;269;1;98;0.99;2017-01-25 15:16:50.996577+00            |
|16052;269;2;678;6.99;2017-01-28 21:44:14.996577+00           |
|16053;269;2;703;0.99;2017-01-29 00:58:02.996577+00           |
+-------------------------------------------------------------+
only showing top 5 rows

## import S3 data
This time using additional parameters to properly split the csv file contents:

In [5]:
schema_df = spark.read.csv(
     "s3a://udacity-dend/pagila/payment/payment.csv"
    ,sep=';'
    ,header=True
    ,inferSchema=True
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
schema_df.show(n=5,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------+---------+------+-----------------------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date                 |
+----------+-----------+--------+---------+------+-----------------------------+
|16050     |269        |2       |7        |1.99  |2017-01-24 21:40:19.996577+00|
|16051     |269        |1       |98       |0.99  |2017-01-25 15:16:50.996577+00|
|16052     |269        |2       |678      |6.99  |2017-01-28 21:44:14.996577+00|
|16053     |269        |2       |703      |0.99  |2017-01-29 00:58:02.996577+00|
|16054     |269        |1       |750      |4.99  |2017-01-29 08:10:06.996577+00|
+----------+-----------+--------+---------+------+-----------------------------+
only showing top 5 rows

In [7]:
schema_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

## the `payment_date` column's been ingested as a `string`
data type when it clearly should be of type `timestamp`.  
  
We'll fix this with Spark SQL's `functions` module.  
  
> [PySpark SQL Functions Docs](<https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions>)

In [8]:
import pyspark.sql.functions as sqlFunctions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## create a new DataFrame
  
* use the `withColumn()` function to replace the `payment_date` column;  
  
> `withColumn()` → Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
  
* use the `to_timestamp()` function to convert the original `payment_date` column to a real `TimestampType` data type;  
  
> `to_timestamp` → Converts a Column of pyspark.sql.types.StringType or pyspark.sql.types.TimestampType into pyspark.sql.types.DateType using the optionally specified format.


In [9]:
dfPayments = schema_df.withColumn('payment_date',sqlFunctions.to_timestamp('payment_date'))

# verify datatypes with "printSchema()"
dfPayments.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

In [10]:
dfPayments.show(n=5,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------+---------+------+--------------------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date              |
+----------+-----------+--------+---------+------+--------------------------+
|16050     |269        |2       |7        |1.99  |2017-01-24 21:40:19.996577|
|16051     |269        |1       |98       |0.99  |2017-01-25 15:16:50.996577|
|16052     |269        |2       |678      |6.99  |2017-01-28 21:44:14.996577|
|16053     |269        |2       |703      |0.99  |2017-01-29 00:58:02.996577|
|16054     |269        |1       |750      |4.99  |2017-01-29 08:10:06.996577|
+----------+-----------+--------+---------+------+--------------------------+
only showing top 5 rows

## add a column having the month
of payment transaction. Use PySpark SQL's `month()` function to achieve this:

In [11]:
dfPayments = dfPayments.withColumn('payment_month',sqlFunctions.month('payment_date'))

dfPayments.show(n=5,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------+---------+------+--------------------------+-------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date              |payment_month|
+----------+-----------+--------+---------+------+--------------------------+-------------+
|16050     |269        |2       |7        |1.99  |2017-01-24 21:40:19.996577|1            |
|16051     |269        |1       |98       |0.99  |2017-01-25 15:16:50.996577|1            |
|16052     |269        |2       |678      |6.99  |2017-01-28 21:44:14.996577|1            |
|16053     |269        |2       |703      |0.99  |2017-01-29 00:58:02.996577|1            |
|16054     |269        |1       |750      |4.99  |2017-01-29 08:10:06.996577|1            |
+----------+-----------+--------+---------+------+--------------------------+-------------+
only showing top 5 rows

## create a temporary view based on this DataFrame
by using the DataFrame's `createOrReplaceTempView()` method:

In [12]:
dfPayments.createOrReplaceTempView('payments_view')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## use SparkSession's `sql()` method
to query the temporary view we've just created:

In [13]:
spark.sql("""
    SELECT
         payment_month
        ,SUM(amount) AS total_revenue
    FROM
        payments_view
    GROUP BY
        payment_month
    ORDER BY
        total_revenue DESC    
""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+------------------+
|payment_month|     total_revenue|
+-------------+------------------+
|            4|28559.460000003943|
|            3|23886.560000002115|
|            2| 9631.879999999608|
|            1| 4824.429999999856|
|            5|  514.180000000001|
+-------------+------------------+

## predetermine the schema before data ingestion  
  
Import PySpark SQL's `types` module to make Spark's native data types available.  
  
The `types` module instantiates the `DataType` base class.
  
> [PySpark SQL's `types` Module Docs](<https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types>)  
  
  
> *KEEP IN MIND*: each DataType instance MUST be followed by parenthesis. E.g.: `IntegerType()`

In [25]:
import pyspark.sql.types as sqlTypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## set the csv file schema
into a variable that basically contains a list of the expected columns and their corresponding Spark data types.

In [37]:
paymentSchema = sqlTypes.StructType([
     sqlTypes.StructField("payment_id",sqlTypes.IntegerType())
    ,sqlTypes.StructField("customer_id",sqlTypes.IntegerType())
    ,sqlTypes.StructField("staff_id",sqlTypes.IntegerType())
    ,sqlTypes.StructField("rental_id",sqlTypes.IntegerType())
    ,sqlTypes.StructField("amount",sqlTypes.DoubleType())
    ,sqlTypes.StructField("payment_date",sqlTypes.DateType())
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## pass the csv file schema
to the `read.csv()` method:

In [38]:

paymentSchemaOnRead = spark.read.csv(
     's3a://udacity-dend/pagila/payment/payment.csv'
    ,sep=';'
    ,header=True
    ,schema=paymentSchema    
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
paymentSchemaOnRead.show(n=5,truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------+---------+------+------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date|
+----------+-----------+--------+---------+------+------------+
|16050     |269        |2       |7        |1.99  |2017-01-24  |
|16051     |269        |1       |98       |0.99  |2017-01-25  |
|16052     |269        |2       |678      |6.99  |2017-01-28  |
|16053     |269        |2       |703      |0.99  |2017-01-29  |
|16054     |269        |1       |750      |4.99  |2017-01-29  |
+----------+-----------+--------+---------+------+------------+
only showing top 5 rows

In [40]:
paymentSchemaOnRead.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)