In [0]:
%run ./authenticate_aws

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf, when, regexp_replace, concat_ws, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType, ArrayType, DoubleType

# Define Spark session
spark = SparkSession.builder.appName("KinesisStreamProcessing").getOrCreate()

# Define function
# For getting the stream
def get_stream(stream_name: str):
    dataframe = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()
    return dataframe


#For deserializing the stream 
def deserialize_stream(stream, schema):
    dataframe = stream \
        .selectExpr("CAST(data as STRING)") \
        .withColumn("data", from_json(col("data"), schema)) \
        .select(col("data.*"))
    return dataframe


# For writing stream data to a Delta table
def write_stream_df_to_table(dataframe, name: str):
    dataframe.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/kinesis/12a3da8f7ced_{name}_table_checkpoints/") \
        .table(f"12a3da8f7ced_{name}_table")

In [0]:
# Define the schemas for each of the dfs 

pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

In [0]:
# Get the streams from Kinesis 

pin_stream = get_stream('streaming-12a3da8f7ced-pin')
geo_stream = get_stream('streaming-12a3da8f7ced-geo')
user_stream = get_stream('streaming-12a3da8f7ced-user')

In [0]:
# Deserialise the streams 

df_pin = deserialize_stream(pin_stream, pin_schema)
df_geo = deserialize_stream(geo_stream, geo_schema)
df_user = deserialize_stream(user_stream, user_schema)

In [0]:
from pyspark.sql import functions as F

def add_nulls_to_dataframe_column(dataframe, column, value_to_replace):
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

# Apply the function to replace nulls in df_pin
columns_and_values_for_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}
for key, value in columns_and_values_for_null.items():
    df_pin = add_nulls_to_dataframe_column(df_pin, key, value)

# Perform transformations on the follower_count column
df_pin = df_pin.withColumn("follower_count", F.expr("CAST(regexp_replace(follower_count, 'k', '000') AS INT)"))
df_pin = df_pin.withColumn("follower_count", F.expr("CAST(regexp_replace(follower_count, 'M', '000000') AS INT)"))

# Ensure that each column containing numeric data has a numeric data type
numeric_columns = ["follower_count"]
for column in numeric_columns:
    df_pin = df_pin.withColumn(column, df_pin[column].cast("double"))

# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", F.expr("regexp_replace(save_location, '.*topics/', 'topics/')"))

# Rename the 'index' column to 'ind' and reorder the df columns
df_pin = df_pin.withColumnRenamed("index", "ind")
column_order = ["ind", "unique_id", "title", "description", "follower_count",
                "poster_name", "tag_list", "is_image_or_video", "image_src",
                "save_location", "category"]
df_pin = df_pin.select(column_order)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

# Define a user-defined function to combine latitude and longitude into an array
def combine_lat_and_long(latitude, longitude):
    return [latitude, longitude]

# Create a new user-defined function (UDF) based on combine_lat_and_long
new_func = udf(combine_lat_and_long, ArrayType(DoubleType()))

# Apply the new UDF to combine latitude and longitude columns and create a new column "coordinates"
df_geo = df_geo.withColumn("coordinates", new_func("latitude", "longitude"))
cols_to_drop = ["latitude", "longitude"]
df_geo = df_geo.drop(*cols_to_drop)

# Convert the 'timestamp' column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", F.to_timestamp("timestamp"))

# Rename the 'index' column to 'ind' and reorder the df columns
df_geo = df_geo.withColumnRenamed("index", "ind")
column_order = ["ind", "country", "coordinates", "timestamp"]
df_geo = df_geo.select(column_order)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws, to_timestamp

# Create a new column 'user_name' by concatenating 'first_name' and 'last_name'
df_user = df_user.withColumn("user_name", F.concat_ws(" ", "first_name", "last_name"))
df_user = df_user.drop("first_name", "last_name")

# Convert the 'date_joined' column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", F.to_timestamp("date_joined"))

# Rename 'index' to 'ind' and reorder the df columns
df_user = df_user.withColumnRenamed("index", "ind")
column_order = ["ind", "user_name", "age", "date_joined"]
df_user = df_user.select(column_order)

In [0]:
# Display the cleaned dfs

In [0]:
display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category


In [0]:
display(df_geo)

ind,country,coordinates,timestamp


In [0]:
display(df_user)

ind,user_name,age,date_joined


In [0]:
# Write the streaming data to Delta Tables

In [0]:
write_stream_df_to_table(df_pin, "pin")

In [0]:
write_stream_df_to_table(df_geo, "geo")

In [0]:
write_stream_df_to_table(df_user, "user")