## Imports

In [None]:
# import pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
# import URL processing
import urllib

## AWS Credentials

In [None]:
# list tables in filestore to get name of credentials file
dbutils.fs.ls("/FileStore/tables")

In [None]:
# Read the CSV file to spark dataframe, passing in options for the header row and separator
aws_keys_df = spark.read.format("csv")\
.option("header", "true")\
.option("sep", ",")\
.load("/FileStore/tables/authentication_credentials.csv")
# Steps:
    # Specify that the CSV file has a header row containing column names
    # Specify the separator used in the CSV file (comma in this case)
    # Load the CSV file from the specified path in the FileStore





In [None]:
import urllib.parse

# Filtering the DataFrame to get the access key ID for 'databricks-user'
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']

# Filtering the DataFrame to get the secret access key for 'databricks-user'
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']

# Encoding the secret key for safety in URL usage
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


## Function definitions

In [None]:
def get_stream(stream_name: str):
    '''Uses spark.readStream to retrieve a Kinesis stream and returns stream as a DataFrame'''
    # Read a Kinesis stream as a DataFrame using Spark
    dataframe = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition','earliest') \
        .option('region','us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()
    return dataframe

def deserialize_stream(stream, schema):
    '''Takes a stream DataFrame and schema, deserializes data from the stream and returns data as a DataFrame'''
    # Deserialize data from the stream DataFrame using a specified schema
    dataframe = stream \
        .selectExpr("CAST(data as STRING)") \
        .withColumn("data", from_json(col("data"), schema)) \
        .select(col("data.*"))
    return dataframe

def add_nulls_to_dataframe_column(dataframe, column, value_to_replace):
    '''Converts matched values in a column of the DataFrame to null based on a specified expression'''
    # Replace values in the specified column with null based on the given expression
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

def write_stream_df_to_table(dataframe, name: str):
    '''Takes a DataFrame and a name string, writes the DataFrame to a Delta table using the name in options and the table name'''
    # Write the DataFrame to a Delta table for further processing
    dataframe.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/kinesis/1215be80977f_{name}_table_checkpoints/") \
        .table(f"1215be80977f_{name}_table")


## Schema definitions

In [None]:
# define schemas for each of the dataframes
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

## Get streams from Kinesis

In [None]:
pin_stream = get_stream('streaming-1215be80977f-pin')
geo_stream = get_stream('streaming-1215be80977f-geo')
user_stream = get_stream('streaming-1215be80977f-user')

## Deserialize streams

In [None]:
df_pin = deserialize_stream(pin_stream, pin_schema)
df_geo = deserialize_stream(geo_stream, geo_schema)
df_user = deserialize_stream(user_stream, user_schema)

## Clean data

In [None]:
# Dictionary specifying column names and values to change to null
columns_and_values_for_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}

# Loop through the dictionary and replace specified values with None in DataFrame columns
for key, value in columns_and_values_for_null.items():
    df_pin = add_nulls_to_dataframe_column(df_pin, key, value)

# Perform transformations on the 'follower_count' column to ensure numeric entries
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))

# Cast the 'follower_count' column to an integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))

# Modify 'save_location' column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Rename the 'index' column to 'ind'
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder columns based on the specified list
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)


In [None]:
# Import necessary types
from pyspark.sql.types import ArrayType, DoubleType

# Define a function that returns a list containing two values
def combine_lat_and_long(latitude, longitude):
    return [latitude, longitude]

# Create a new user-defined function (UDF) that applies the defined function to columns and returns an array of doubles
new_func = udf(combine_lat_and_long, ArrayType(DoubleType()))

# Apply the new UDF to combine 'latitude' and 'longitude' columns into a new 'coordinates' column
df_geo = df_geo.withColumn("coordinates", new_func("latitude", "longitude"))

# Drop the 'latitude' and 'longitude' columns from the DataFrame
cols_to_drop = ("latitude", "longitude")
df_geo = df_geo.drop(*cols_to_drop)

# Convert the 'timestamp' column from string type to timestamp type
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# Change the column order in the DataFrame
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_geo_column_order)


In [None]:
# Create a new column 'user_name' by concatenating 'first_name' and 'last_name' with a space in between
df_user = df_user.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))

# Drop the 'first_name' and 'last_name' columns from the DataFrame
cols_to_drop = ("first_name", "last_name")
df_user = df_user.drop(*cols_to_drop)

# Convert the 'date_joined' column from string type to timestamp type
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

# Change the column order in the DataFrame according to the specified list
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined",
]
df_user = df_user.select(new_user_column_order)


## Display data

In [None]:
display(df_pin)

In [None]:
display(df_geo)

In [None]:
display(df_user)

## Write data to Delta tables

In [None]:
write_stream_df_to_table(df_pin, "pin")

In [None]:
write_stream_df_to_table(df_geo, "geo")

In [None]:
write_stream_df_to_table(df_user, "user")