#### Import libraries, local modules and variables

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

In [0]:
%run ./authenticate_aws

In [0]:
# Authentication function from pinterest_autheticate_aws
ACCESS_KEY = authentication()['ACCESS_KEY']
SECRET_KEY = authentication()['SECRET_KEY']

In [0]:
%run ./cleaning_utils

#### Structure Schema

In [0]:
pin_streaming_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())])

geo_streaming_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())])

user_streaming_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("date_joined", TimestampType())])

#### Ingest streaming data from Kinesis

In [0]:
# Read and deserialise the streaming data from Kinesis
def read_kinesis_stream(stream_name:str, df_name:str, streaming_schema):
    df_name = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    # Deserialise before structuring data
    df_name = df_name \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), streaming_schema)) \
    .select(col("data.*"))
    return df_name

#### Read Kinesis Streams

In [0]:
df_pin = read_kinesis_stream(stream_name="streaming-0a60b9a8a831-pin", df_name="df_pin", 
                             streaming_schema=pin_streaming_schema)
df_geo = read_kinesis_stream(stream_name="streaming-0a60b9a8a831-geo", df_name="df_geo", 
                             streaming_schema=geo_streaming_schema)
df_user = read_kinesis_stream(stream_name="streaming-0a60b9a8a831-user", df_name="df_user", 
                              streaming_schema=user_streaming_schema)

#### Clean Dataframes

In [0]:
pin_table = clean_df_pin(df_pin)
geo_table = clean_df_geo(df_geo)
user_table = clean_df_user(df_user)

In [0]:
display(user_table)
display(geo_table)
display(df_user)

ind,user_name,age,date_joined


#### Store as a Databricks Delta Table function

In [0]:
# Writing the streams to Databricks Delta tables function.
def store_as_delta(df_name , name:str):
    df_name.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
        .table(f"0a60b9a8a831_{name}")

In [0]:
# Store the three dataframes as Delta tables.
store_as_delta(pin_table,'pin_table')
store_as_delta(geo_table,'geo_table')
store_as_delta(user_table,'user_table')

In [0]:
# Before running the writeStream function again, this deletes the checkpoint folder.
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)