In [None]:
# import pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
# import URL processing
import urllib
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
import json 

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# # File location for authentication_credentials.csv
# credentials_file_location = "/dbfs/FileStore/tables/authentication_credentials.csv"


# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']

# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


In [None]:
def get_stream(stream_name: str):
    '''Uses spark.readStream to retrieve Kinesis stream and returns stream as dataframe'''
    dataframe = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return dataframe

def deserialize_stream(stream, schema):
    '''Takes stream dataframe and schema, deserializes data from stream and returns data as dataframe'''
    dataframe = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return dataframe

def add_nulls_to_dataframe_column(dataframe, column, value_to_replace):
    '''Converts matched values in column of dataframe to null based on expression'''
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

def write_stream_df_to_table(dataframe, name: str):
    '''Takes dataframe and name string and writes dataframe to delta table using name in options and table name'''
    dataframe.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/0a2528ba1237_{name}_table_checkpoints/") \
    .table(f"0a2528ba1237_{name}_table")

In [None]:
# define schemas for each of the dataframes
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])
     

In [None]:
pin_stream = get_stream('streaming-0a2528ba1237-pin')
geo_stream = get_stream('streaming-0a2528ba1237-geo')
user_stream = get_stream('streaming-0a2528ba1237-user')

In [None]:
df_pin = deserialize_stream(pin_stream, pin_schema)
df_geo = deserialize_stream(geo_stream, geo_schema)
df_user = deserialize_stream(user_stream, user_schema)

In [None]:
columns_and_values_for_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}
for key, value in columns_and_values_for_null.items():
    df_pin = add_nulls_to_dataframe_column(df_pin, key, value)
# Perform the necessary transformations on the follower_count to ensure every entry is a number
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# cast follower_count column to integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
# convert save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")
# reorder columns
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)

In [None]:
display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,/data/diy-and-crafts,diy-and-crafts
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,/data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,/data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,/data/quotes,quotes
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",211000.0,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,/data/tattoos,tattoos
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43000.0,Thrive Causemetics,,video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,/data/beauty,beauty
4315,21b59ba9-829d-4c33-8c27-4cd4c56d26b8,Podcasts for Teachers or Parents of Teenagers,"Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng…",25000.0,Math Giraffe,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",image,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,/data/education,education
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,437.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,/data/vehicles,vehicles


In [None]:
# import types
from pyspark.sql.types import ArrayType, DoubleType
# define function for returning list containing two values
def combine_lat_and_long(latitude, longitude):
    return [latitude, longitude]
# define new user-defined function
new_func = udf(combine_lat_and_long, ArrayType(DoubleType()))
# apply new udf to combine latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", new_func("latitude", "longitude"))
# drop the latitude and longitude columns
cols_to_drop = ("latitude", "longitude")
df_geo = df_geo.drop(*cols_to_drop)
# convert timestamp column from type string to type timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
# drop null value indexes
df_geo = df_geo.na.drop(subset=["ind"])
# change column order
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_geo_column_order)

In [None]:
display(df_geo)

ind,country,coordinates,timestamp
7528,Albania,"List(-89.97869873046875, -173.29299926757812)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.344449996948242, -177.9239959716797)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.01499938964844, -101.43699645996094)",2021-04-19T17:37:03.000+0000
7528,Albania,"List(-89.97869873046875, -173.29299926757812)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.344449996948242, -177.9239959716797)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.01499938964844, -101.43699645996094)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.88520050048828, -164.8699951171875)",2019-09-13T04:50:29.000+0000
8731,Aruba,"List(-83.10399627685547, -171.302001953125)",2020-07-17T04:39:09.000+0000
1313,Maldives,"List(77.0447006225586, 61.91189956665039)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.850799560546875, 66.10030364990234)",2019-12-15T03:51:28.000+0000


In [None]:
#clean the df_user DataFrame
# Create a new column 'user_name' by concatenating 'first_name' and 'last_name'
df_user = df_user.withColumn('user_name', F.concat_ws(' ', 'first_name', 'last_name'))

# Drop the 'first_name' and 'last_name' columns
df_user = df_user.drop('first_name', 'last_name')

# Convert the 'date_joined' column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', F.to_timestamp('date_joined'))

# drop null value indexes
df_user = df_user.na.drop(subset=["ind"])

# Reorder the DataFrame columns
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

# Display the cleaned DataFrame
display(df_user)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000


In [None]:
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a2528ba1237_user_table")

df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a2528ba1237_geo_table")

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a2528ba1237_pin_table")

In [None]:
%sql
DESCRIBE DETAIL 0a2528ba1237_pin_table;

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,19ebda30-d7c6-4148-9c13-b615d30630e0,default.0a2528ba1237_pin_table,,dbfs:/user/hive/warehouse/0a2528ba1237_pin_table,2023-12-17T23:33:36.590+0000,2023-12-17T23:37:03.000+0000,List(),1,131478,Map(),1,2


In [None]:
# %sql
# SELECT COUNT(*) AS row_count
# FROM 0a2528ba1237_geo_table;

In [None]:
# Read data from Delta table
df = spark.read.format("delta").table("0a2528ba1237_pin_table")

# Display the contents of the DataFrame
df.show(truncate=False)