In [None]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
# Creating df_pin from pin kinesis stream
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e8c5a5fa275-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
# Parsing df_pin from data
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StringType, IntegerType  # include all necessary types

# Defineing schema 
pin_schema = StructType([
    StructField("index", StringType(),True),
    StructField("unique_id", StringType(),True),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),  
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])  
# Defining outer schema
outer_schema = StructType([
    StructField("value", pin_schema)
])

# Delinearising data from stream
df_pin_json_string = df_pin.selectExpr("cast(data as string) as json_string")

df_pin_parsed = df_pin_json_string.select(from_json(col("json_string"), outer_schema).alias("parsed_json"))

# Now select the data fields from the "value" column

df_pin_flattened = df_pin_parsed.select("parsed_json.value.*")


#display(df_pin_flattened)

In [None]:
# Cleaning df_pin
# Import the necessary libraries
from pyspark.sql.functions import regexp_replace, col, expr, when
from pyspark.sql.types import IntegerType

# Replace empty entries and entries with no relevant data with Nones
columns_to_clean = ["description","follower_count","image_src", "poster_name", "tag_list","title"]

irrelevant_data_indicators = {
    "description": ["", "No description available Story format", "No description available","Untitled"],
    "follower_count": ["","User Info Error"],
    "image_src": ["", "Image src error."],
    "poster_name": ["", "User Info Error"],
    "tag_list": ["", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"],
    "title": ["", "No Title Data Available"]
}

for column in columns_to_clean:
    df_pin_flattened = df_pin_flattened.withColumn(column, 
                               when(col(column).isin(irrelevant_data_indicators[column]), None)
                               .otherwise(col(column)))
    

# Ensuring each column containing numeric data has a numeric data type
df_pin_flattened = df_pin_flattened.withColumn("follower_count", 
                   when(col("follower_count").endswith("k"), 
                        (regexp_replace(col("follower_count"), "k$", "").cast("float") * 1000).cast("int"))
                   .when(col("follower_count").endswith("M"), 
                        (regexp_replace(col("follower_count"), "M$", "").cast("float") * 1000000).cast("int"))
                   .otherwise(col("follower_count").cast("int")))

# Clean the data in the save_location column to include only the save location path
df_pin_flattened = df_pin_flattened.withColumn("save_location", regexp_replace(col("save_location"), "^Local save in (/data/.+)$", "$1"))

# Rename the index column to ind
df_pin_flattened = df_pin_flattened.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns
column_order = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list",
                "is_image_or_video", "image_src", "save_location", "category"]
df_pin_flattened = df_pin_flattened.select(column_order)

# Show the cleaned DataFrame
#display(df_pin_flattened)

In [None]:
# Reinitializing checkpoint
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
# Appending streamed df_pin data to delta table
df_pin_flattened.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e8c5a5fa275_pin_table")

In [None]:
# Creating df_geo from geo kinesis stream
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e8c5a5fa275-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
# Parsing df_geo from data
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StringType, IntegerType  # include all necessary types

# Defining schema
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", StringType()), 
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])
 # Defining outer schema
outer_schema = StructType([
    StructField("value", geo_schema)
])

# Dilinearising data
df_geo_json_string = df_geo.selectExpr("cast(data as string) as json_string")

df_geo_parsed = df_geo_json_string.select(from_json(col("json_string"), outer_schema).alias("parsed_json"))

# Selecting data from the stream
df_geo_flattened = df_geo_parsed.select("parsed_json.value.*")
# Flatten the nested structure if necessary

#display(df_geo_flattened)

In [None]:
# Cleaning df_geo
# Import the necessary libraries
from pyspark.sql.functions import col, array, unix_timestamp, from_unixtime
from pyspark.sql.types import ArrayType, FloatType, TimestampType

# Create a new column "coordinates" that contains an array based on the latitude and longitude columns
df_geo_flattened = df_geo_flattened.withColumn("coordinates", array(col("latitude").cast(FloatType()), col("longitude").cast(FloatType())))

# Drop the "latitude" and "longitude" columns from the DataFrame
df_geo_flattened = df_geo_flattened.drop("latitude", "longitude")

# Convert the "timestamp" column from a string to a timestamp data type
df_geo_flattened = df_geo_flattened.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss"))

# Reorder the DataFrame columns
column_order = ["ind", "country", "coordinates", "timestamp"]
df_geo_flattened = df_geo_flattened.select(column_order)

# Show the cleaned DataFrame
#display(df_geo_flattened)

In [None]:
# Reinitializing checkpoint
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
# Appending streamed df_geo data to delta table
df_geo_flattened.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e8c5a5fa275_geo_table")

In [None]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e8c5a5fa275-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StringType, IntegerType  # include all necessary types

# Defining schema
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("date_joined", StringType())
])

# Definig outer schema
outer_schema = StructType([
    StructField("value", user_schema)
])

# Delinearising stream data
df_user_json_string = df_user.selectExpr("cast(data as string) as json_string")

df_user_parsed = df_user_json_string.select(from_json(col("json_string"), outer_schema).alias("parsed_json"))

# Selecting data from the stream

df_user_flattened = df_user_parsed.select("parsed_json.value.*")
# Flatten the nested structure if necessary

#display(df_user_flattened)

In [None]:
# Cleaning df_user data
# Import the necessary libraries
from pyspark.sql.functions import col, concat_ws, to_timestamp
from pyspark.sql.types import TimestampType

# Create a new column "user_name" by concatenating "first_name" and "last_name" columns
df_user_flattened = df_user_flattened.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop the "first_name" and "last_name" columns from the DataFrame
df_user_flattened = df_user_flattened.drop("first_name", "last_name")

# Convert the "date_joined" column from a string to a timestamp data type
df_user_flattened = df_user_flattened.withColumn("date_joined", to_timestamp(col("date_joined"), "yyyy-MM-dd'T'HH:mm:ss"))

# Reorder the DataFrame columns
column_order = ["ind", "user_name", "age", "date_joined"]
df_user_flattened = df_user_flattened.select(column_order)

# Show the cleaned DataFrame
#display(df_user_flattened)

In [None]:
# Reinitializing checkpoint
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
# Appending streamed df_user data to delta table
df_user_flattened.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e8c5a5fa275_user_table")