In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
from pyspark.sql.window import Window

# Read Credentials

In [None]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


# Read Streaming data

In [None]:
# Stream pin post
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12853887c065-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# display(df_pin)

# Stream geolocation
df_geo = spark\
    .readStream\
    .format('kinesis')\
    .option('streamName', 'streaming-12853887c065-geo')\
    .option('initialPosition', 'earliest')\
    .option('region', 'us-east-1')\
    .option('awsAccessKey', ACCESS_KEY)\
    .option('awsSecretKey', SECRET_KEY)\
    .load()

# df_geo.display()

#Stream user
df_user = spark\
    .readStream\
    .format('kinesis')\
    .option('streamName', 'streaming-12853887c065-user')\
    .option('initialPosition', 'earliest')\
    .option('region', 'us-east-1')\
    .option('awsAccessKey', ACCESS_KEY)\
    .option('awsSecretKey', SECRET_KEY)\
    .load()
# df_user.display()


# Convert streaming data to dataframe

In [None]:
#Cast to string to read json
df_pin = df_pin.selectExpr("CAST(data as STRING)")
df_geo = df_geo.selectExpr("CAST(data as STRING)")
df_user = df_user.selectExpr("CAST(data as STRING)")

###Construct schema
# pin schema
df_pin_schema = StructType([\
    StructField("index", IntegerType(),True),\
    StructField("unique_id", StringType(),True),\
    StructField("title", StringType(),True),\
    StructField("follower_count", StringType(),True),\
    StructField("poster_name", StringType(),True),\
    StructField("tag_list", StringType(),True),\
    StructField("is_image_or_video", StringType(),True),\
    StructField("image_src", StringType(),True),\
    StructField("save_location", StringType(),True),\
    StructField("category", StringType(),True),\
    StructField("downloaded", IntegerType(),True),\
    StructField("description", StringType(),True)\
])


#geo schema
df_geo_schema = StructType([\
    StructField("ind", IntegerType(),True),\
    StructField("country", StringType(),True),\
    StructField("latitude", StringType(),True),\
    StructField("longitude", StringType(),True),\
    StructField("timestamp", StringType(),True),\
])

#geo schema
df_user_schema = StructType([\
    StructField("ind", IntegerType(),True),\
    StructField("first_name", StringType(),True),\
    StructField("last_name", StringType(),True),\
    StructField("age", StringType(),True),\
    StructField("date_joined", StringType(),True),\
])


#Convert json to dataframe using the defined schemas

df_pin = df_pin.withColumn("data", from_json(col("data"),df_pin_schema))\
                .select("data.*") # data : Kinesis streaming data header name

#geo
df_geo = df_geo.withColumn('data', from_json(col('data'), df_geo_schema)).select('data.*')

#user
df_user = df_user.withColumn('data', from_json(col('data'), df_user_schema)).select('data.*')

# Cleaning Pin post

In [None]:
# Cast follower_count column to int
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], 'k', ''))

df_pin = df_pin.withColumn("follower_count", col('follower_count').cast('int'))
df_pin = df_pin.withColumn("downloaded",col("downloaded").cast("int"))
df_pin = df_pin.withColumn("index",col("index").cast("int"))

#Update the follower_count to 1000
df_pin = df_pin.withColumn('follower_count', df_pin.follower_count*1000) #run once

#Replace null values
df_pin = df_pin.fillna(0)

#Replace null values with None

# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn('save_location', split(df_pin.save_location, ' ').getItem(3))

#Rename the index column to ind.
df_pin = df_pin.withColumn('ind', col('index'))

df_pin_header = ['ind',
  'unique_id',
  'title',
  'description',
  'follower_count',
  'poster_name',
  'tag_list',
  'is_image_or_video',
  'image_src',
  'save_location',
  'category']

df_pin = df_pin.select(df_pin_header)

df_pin.display()

# Cleaning geolocation

In [None]:
# Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn('coordinates', array('latitude', 'longitude'))

# Drop the latitude and longitude columns from the DataFrame
df_geo = df_geo.drop('latitude', 'longitude')

# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn('timestamp', col('timestamp').cast('timestamp'))
df_geo = df_geo.withColumn('timestamp', col('timestamp').cast('timestamp'))


# Reorder the DataFrame columns to have the following column order:
df_geo_header = [ 'ind',
  'country',
  'coordinates',
  'timestamp']

df_geo = df_geo.select(df_geo_header)
df_geo.display()

# Cleaning user data

In [None]:
# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn('user_name', concat('first_name', 'last_name'))

# Drop the first_name and last_name columns from the DataFrame
df_user = df_user.drop('first_name', 'last_name')

# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', col('date_joined').cast('timestamp'))

# Reorder the DataFrame columns to have the following column order:

df_user_header = ['ind',
  'user_name',
  'age',
  'date_joined']

df_user = df_user.select(df_user_header)

df_user.display()

# Write the streaming data to Delta Tables

In [None]:
# Remove the checkpoint folder first
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12853887c065_pin_table")


df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12853887c065_geo_table")


df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12853887c065_user_table")