### Note: In this notebook, we are accessing data hosted in a remote S3 bucket and loading it all into the memory of the machine hosting this Jupyter Notebook. 
### It is not recommended that you do this in production. 
### This would work more seamlessly if you are using a machine inside Amazon, preferable in the same region as the S3 bucket. 
### This notebook just demonstrates the ability of Spark to deal with S3 data.

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import os
import configparser

### Load AWS credentials

In [None]:
! aws s3 cp s3://udacity-dend/pagila/payment/payment.csv .


In [None]:
! ls

In [2]:
config = configparser.ConfigParser()

config.read_file(open('aws/credentials.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Create Spark Session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.2.0")\
                     .getOrCreate()

In [None]:
spark._sc._jsc.hadoopConfiguration().set(
            "fs.s3a.multipart.size", "104857600"
)

spark._sc._jsc.hadoopConfiguration().set(
             "fs.s3a.access.key", config['AWS']['AWS_ACCESS_KEY_ID']
)

spark._sc._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", config['AWS']['AWS_SECRET_ACCESS_KEY']
)

spark._sc._jsc.hadoopConfiguration().set(
            "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)

In [None]:
spark.sparkContext.getConf().getAll()

### Load data from S3

In [None]:
spark

In [None]:
sc = spark.sparkContext
print(f"Hadoop version = {sc._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")

In [4]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [5]:
df.printSchema()

root
 |-- _c0: string (nullable = true)



In [6]:
df.show(5)

+--------------------+
|                 _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows



### Infer schema (Schema on read), fix header and separator

In [7]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv", sep=";", inferSchema=True, header=True)

In [8]:
df.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)



In [9]:
df.show(5)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



### Fix data: Cast payment_date to timestamp 

In [10]:
import pyspark.sql.functions as F

In [11]:
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))

In [12]:
dfPayment.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)



In [13]:
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 13:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 07:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 13:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-28 16:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 00:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



### Extract the month

In [14]:
dfPayment = dfPayment.withColumn("month", F.month("payment_date"))

In [15]:
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2017-01-24 13:40:...|    1|
|     16051|        269|       1|       98|  0.99|2017-01-25 07:16:...|    1|
|     16052|        269|       2|      678|  6.99|2017-01-28 13:44:...|    1|
|     16053|        269|       2|      703|  0.99|2017-01-28 16:58:...|    1|
|     16054|        269|       1|      750|  4.99|2017-01-29 00:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



### Compute aggregate revenue per month

In [16]:
dfPayment.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month, sum(amount) as revenue
    FROM payment
    GROUP BY month
    ORDER BY revenue DESC
""").show()

+-----+------------------+
|month|           revenue|
+-----+------------------+
|    4|28559.460000003943|
|    3| 22916.93000000175|
|    2|10601.509999999556|
|    1| 4824.429999999856|
|    5|  514.180000000001|
+-----+------------------+

