"""
@Author: Samarth BM

@Date: 2021-12-03

@Last Modified by: Samarth BM

@Title : To read the data from aws s3 and process the data and store the cleaned data to hdfs.

"""

In [2]:
import findspark

In [3]:
findspark.init('/home/samarth/spark')

In [None]:
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

In [5]:
from dotenv import load_dotenv
load_dotenv('.env')
import os

In [6]:
access_key=os.getenv("ACCESS_KEY_ID")
access_key_secret=os.getenv("SECRETE_ACCESS_KEY")

Reading data stored in s3 bucket

In [7]:
df= spark.read.option("inferSchema", "true").csv("s3a://{0}:{1}@realtimestockdata/StockData.csv".format(access_key,access_key_secret), header=True)

21/12/08 18:49:00 WARN S3xLoginHelper: The Filesystem URI contains login details. This is insecure and may be unsupported in future.


In [8]:
df.show()

+-------------------+------+------+------+------+------+
|                  0|     1|     2|     3|     4|     5|
+-------------------+------+------+------+------+------+
|2021-12-01 19:31:00|117.03|117.03|117.03|117.03|   150|
|2021-12-01 19:03:00|117.43|117.43|117.43|117.43|   250|
|2021-12-01 18:26:00| 117.1| 117.1| 117.1| 117.1|   438|
|2021-12-01 18:01:00|117.15|117.15|117.15|117.15|   100|
|2021-12-01 17:48:00|116.95|116.95|116.95|116.95|   151|
|2021-12-01 17:17:00|116.93|116.93|116.93|116.93|   600|
|2021-12-01 17:15:00|116.93|116.93|116.93|116.93|   194|
|2021-12-01 17:11:00|116.96|116.96|116.96|116.96|   105|
|2021-12-01 17:10:00|116.99|116.99|116.99|116.99|   116|
|2021-12-01 17:01:00| 117.0| 117.0| 117.0| 117.0|   305|
|2021-12-01 16:48:00| 117.2| 117.2| 117.2| 117.2|   100|
|2021-12-01 16:39:00|117.01|117.01|117.01|117.01|   514|
|2021-12-01 16:26:00|117.43|117.43|117.43|117.43|   125|
|2021-12-01 16:17:00| 117.0| 117.0| 117.0| 117.0|   250|
|2021-12-01 16:04:00|116.92|116

Renaming the columns

In [11]:
df1=df.withColumnRenamed('0','time')\
    .withColumnRenamed('1','open')\
    .withColumnRenamed('2','high')\
    .withColumnRenamed('3','low')\
    .withColumnRenamed('4','close')\
    .withColumnRenamed('5','volume')

df1.show(5)

+-------------------+------+------+------+------+------+
|               time|  open|  high|   low| close|volume|
+-------------------+------+------+------+------+------+
|2021-12-01 19:31:00|117.03|117.03|117.03|117.03|   150|
|2021-12-01 19:03:00|117.43|117.43|117.43|117.43|   250|
|2021-12-01 18:26:00| 117.1| 117.1| 117.1| 117.1|   438|
|2021-12-01 18:01:00|117.15|117.15|117.15|117.15|   100|
|2021-12-01 17:48:00|116.95|116.95|116.95|116.95|   151|
+-------------------+------+------+------+------+------+
only showing top 5 rows



Changing the data types of each column.

In [13]:
from pyspark.sql.types import DoubleType

In [14]:
df2=df1\
    .withColumn("open",col("open").cast(DoubleType()))\
    .withColumn("high",col("high").cast(DoubleType()))\
    .withColumn("low",col("low").cast(DoubleType()))\
    .withColumn("close",col("close").cast(DoubleType()))\
    .withColumn("volume",col("volume").cast(DoubleType()))

In [15]:
df2.printSchema()

root
 |-- time: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)



Checking for null values

In [16]:
cleaned_data=df2.toPandas()
cleaned_data.isna().sum()

time      0
open      0
high      0
low       0
close     0
volume    0
dtype: int64

Storing the cleaned data to hdfs

In [None]:
df2.write.option("header", True).csv("CleanedStockData")