# **Streaming Data Processing using Spark on Databricks**

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib

*Read the csv file containing the AWS keys to the databricks*

In [None]:
file_type = "csv"
first_row_is_header = "true"
delimiter = ","

aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

KINESIS_REGION = "us-east-1"
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

*Loads the streaming data into the dataframes (`df_pin`, `df_geo` and `df_user`), from the kinesis streams*

In [None]:
kinesis_dict = {'df_pin_stream': {'kinesis_pin':"streaming-12f7a43505b1-pin"}, 'df_geo_stream':{'kinesis_geo':"streaming-12f7a43505b1-geo"}, 'df_user_stream':{'kinesis_user':"streaming-12f7a43505b1-user"}}
for key, value in kinesis_dict.items():
    for stream_name, kinesis_streaming_name in value.items():
        KINESIS_STREAM_NAME = f"{kinesis_streaming_name}"
        stream_name = (spark.readStream
        .format("kinesis")
        .option("streamName", KINESIS_STREAM_NAME)
        .option("region", KINESIS_REGION)
        .option("initialPosition", '{"at_timestamp": "05/23/2023 00:00:00 GMT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}')
        .option("awsAccessKey", ACCESS_KEY)
        .option("awsSecretKey", ENCODED_SECRET_KEY)
        .load())
    key = stream_name.selectExpr("cast (data as STRING) jsonData", "approximateArrivalTimestamp").withColumn("approximateArrivalDate", to_date(col("approximateArrivalTimestamp")))  


df_pin = df_pin_stream.select(json_tuple('jsonData', "index", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src",  "save_location", "category", "downloaded").alias("index", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src",  "save_location", "category", "downloaded"))
df_geo = df_geo_stream.select(json_tuple('jsonData', 'ind', 'country', 'latitude', 'longitude', 'timestamp').alias('ind', 'country', 'latitude', 'longitude', 'timestamp'))
df_user = df_user_stream.select(json_tuple('jsonData', 'ind', "first_name", "last_name", 'age', 'date_joined').alias('ind', "first_name", "last_name", 'age', 'date_joined'))

>### Data Cleaning

#### *Data cleaning for df_pin*
 - Removing duplicate rows in the dataframe
 - Renaming the column `index` to `ind`
 - Re-ordering the column names in the dataframe
 - Replacing the values of `follower_count` column wherever necessary.
 - Converting the columns `follower_count`, `ind`, `downloaded` into a integer data type
 - Removing any additional strings from the `save_location` column
 - Replacing all the NA with `None`
 - Dropping the rows where all columns have null values


In [None]:
df_pin = df_pin.dropDuplicates()
df_pin = df_pin.withColumnRenamed('index', 'ind')
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", 
"tag_list", "is_image_or_video", "image_src", "save_location", "category", "downloaded")
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%k]', '000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%M]', '000000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%User Info Error%]', ''))
df_pin = df_pin.withColumn('ind', df_pin['ind'].cast(IntegerType()))
df_pin = df_pin.withColumn('downloaded', df_pin['downloaded'].cast(IntegerType()))
df_pin = df_pin.withColumn('follower_count', df_pin['follower_count'].cast(IntegerType()))
df_pin = df_pin.withColumn('save_location', regexp_replace('save_location', 'Local save in *', ''))
df_pin.na.fill('None', ['is_image_or_video', 'image_src'])
df_pin.na.drop(how = "all")

#### *Data cleaning for df_geo*
 - Removing duplicate rows in the dataframe
 - Converting the columns `ind` to integer type, `latitude` and `longitude` to double type, `timestamp` into a timestamp data type
 - Creating new column `coordinates` with the values to be the array of `latitude` and `longitude` column and deleting these two columns
 - Re-ordering the column names in the dataframe

In [None]:
df_geo = df_geo.dropDuplicates()
df_geo = df_geo.withColumn('longitude', df_geo['longitude'].cast(DoubleType()))
df_geo = df_geo.withColumn('ind', df_geo['ind'].cast(IntegerType()))
df_geo = df_geo.withColumn("timestamp", df_geo["timestamp"].cast(TimestampType()))

df_geo = df_geo.withColumn('latitude', df_geo['latitude'].cast(DoubleType()))
df_geo = df_geo.withColumn('longitude', df_geo['longitude'].cast(DoubleType()))
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
df_geo = df_geo.drop('latitude', 'longitude')
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

#### *Data cleaning for df_user*
 - Removing duplicate rows in the dataframe
 - Creating new column `user_name` by combining the `first_name` and `last_name` column and deleting these two columns
 - Converting the `date_joined` column into a timestamp data type and `age` to integer data type
 - Re-ordering the column names in the dataframe

In [None]:

df_user = df_user.dropDuplicates()
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
df_user = df_user.drop("first_name", "last_name")
df_user = df_user.withColumn('date_joined', df_user['date_joined'].cast(TimestampType()))
df_user = df_user.withColumn('age', df_user['age'].cast(TimestampType()))
df_user = df_user.select("ind", "user_name", "age", "date_joined")

#### Creating Delta Table

- *Creating three delta tables for the three kinesis data streams:* 
    - *`12f7a43505b1_pin_table`*
    - *`12f7a43505b1_geo_table`*
    - *`12f7a43505b1_user_table`*

In [None]:
tables = {df_pin :'12f7a43505b1_pin_table', df_geo :'12f7a43505b1_geo_table', df_user :'12f7a43505b1_user_table'}
for df_name, delta_table in tables:
    df_name.writeStream.format("delta").option("checkpointLocation",f"dbfs:/user/hive/warehouse/{delta_table}").table(f'{delta_table}')