In [1]:
from pyspark.sql import SparkSession
import os
import configparser

## Load AWS credentials

In [2]:
config = configparser.ConfigParser()

config.read_file(open('dwh.cfg'))

AWS_ACCESS_KEY_ID = config.get('AWS', 'ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'SECRET_ACCESS_KEY')


## Create spark session with hadoop-aws package

In [6]:
import pyspark

config = pyspark.SparkConf()
config.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1")
config.set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
config.set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
config.set("spark.hadoop.metrics.conf.file", "/home/kevin/spark/spark-3.3.1-bin-hadoop3/conf/metrics.properties") 
config.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
config.set("spark.jars", "/home/kevin/spark/hadoop-aws-3.2.4.jar,/home/kevin/spark/aws-java-sdk-bundle-1.11.901.jar")


spark = SparkSession.builder.config(conf=config).getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

## Load data from S3

In [4]:
df = spark.read.format("csv")\
    .option('header', 'true')\
    .load("s3a://pagiladata1/payment.csv")
df.printSchema()

                                                                                

root
 |-- payment_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- staff_id: string (nullable = true)
 |-- rental_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- payment_date: string (nullable = true)



In [5]:
df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2020-01-25 00:40:...|
|     16051|        269|       1|       98|  0.99|2020-01-25 18:16:...|
|     16052|        269|       2|      678|  6.99|2020-01-29 00:44:...|
|     16053|        269|       2|      703|  0.99|2020-01-29 03:58:...|
|     16054|        269|       1|      750|  4.99|2020-01-29 11:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



                                                                                

In [7]:
import pyspark.sql.functions as F
dfPayment = df.withColumn('payment_date', F.to_timestamp('payment_date'))
dfPayment.printSchema()

root
 |-- payment_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- staff_id: string (nullable = true)
 |-- rental_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- payment_date: timestamp (nullable = true)



In [8]:
dfPayment = dfPayment.withColumn('month', F.month('payment_date'))
dfPayment.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2020-01-25 00:40:...|    1|
|     16051|        269|       1|       98|  0.99|2020-01-25 18:16:...|    1|
|     16052|        269|       2|      678|  6.99|2020-01-29 00:44:...|    1|
|     16053|        269|       2|      703|  0.99|2020-01-29 03:58:...|    1|
|     16054|        269|       1|      750|  4.99|2020-01-29 11:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



                                                                                

In [9]:
spark.stop()