In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

delta_path = "dbfs:/user/hive/warehouse/authentication_credentials"
credentials_df = spark.read.format("delta").load(delta_path)

ACCESS_KEY = credentials_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = credentials_df.select('Secret access key').collect()[0]['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false;
SET spark.databricks.kinesis.listShards.enabled=false

In [0]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


stream_pin_df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', 'Kinesis-Prod-Stream') \
    .option('initialPosition', 'latest') \
    .option('region', 'us-east-1') \
    .option('accessKeyId', ACCESS_KEY) \
    .option('secretAccessKey', SECRET_KEY) \
    .load()


pin_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    ])



stream_pin_df = stream_pin_df.filter(stream_pin_df.partitionKey == "pin-partition")
stream_pin_df = stream_pin_df.selectExpr("CAST(data as STRING) jsonData")
stream_pin_df = stream_pin_df.select(from_json("jsonData", pin_schema).alias("data")).select("data.*")

display(stream_pin_df)
    

In [0]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


stream_geo_df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', 'Kinesis-Prod-Stream') \
    .option('initialPosition', 'latest') \
    .option('region', 'us-east-1') \
    .option('accessKeyId', ACCESS_KEY) \
    .option('secretAccessKey', SECRET_KEY) \
    .load()

geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("country", StringType(), True),
])


stream_geo_df = stream_geo_df.filter(stream_geo_df.partitionKey == "geo-partition")
stream_geo_df = stream_geo_df.selectExpr("CAST(data as STRING) jsonData")
stream_geo_df = stream_geo_df.select(from_json("jsonData", geo_schema).alias("data")).select("data.*")

display(stream_geo_df)
    

In [0]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

stream_user_df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', 'Kinesis-Prod-Stream') \
    .option('initialPosition', 'latest') \
    .option('region', 'us-east-1') \
    .option('accessKeyId', ACCESS_KEY) \
    .option('secretAccessKey', SECRET_KEY) \
    .load()

user_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True),
])

stream_user_df = stream_user_df.filter(stream_user_df.partitionKey == "user-partition")
stream_user_df = stream_user_df.selectExpr("CAST(data as STRING) jsonData")
stream_user_df = stream_user_df.select(from_json("jsonData", user_schema).alias("data")).select("data.*")

display(stream_user_df)

In [0]:
from pyspark.sql.functions import regexp_replace


def clean_pin_data(stream_pin_df):
    pin_cleaned_df = stream_pin_df.replace({'No description available Story format': None,}, subset=['description'])
    pin_cleaned_df = pin_cleaned_df.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None,}, subset=['tag_list'])
    pin_cleaned_df = pin_cleaned_df.replace({'No Title Data Available': None,}, subset=['title'])
    pin_cleaned_df = pin_cleaned_df.replace({'User Info Error': None, }, subset=['follower_count'])
    pin_cleaned_df = pin_cleaned_df.replace({'Image src error.': None, }, subset=['image_src'])
    pin_cleaned_df = pin_cleaned_df.replace({'User Info Error': None, }, subset=['poster_name'])

    cleaned_pin_data_df = pin_cleaned_df.withColumn('follower_count', pin_cleaned_df['follower_count'].cast('int'))
    cleaned_pin_data_df = cleaned_pin_data_df.withColumn('save_location', regexp_replace('save_location', 'Local save in', ''))
    cleaned_pin_data_df = cleaned_pin_data_df.withColumn('follower_count', regexp_replace('follower_count', 'k', '000'))
    cleaned_pin_data_df = cleaned_pin_data_df.withColumn('follower_count', regexp_replace('follower_count', 'M', '000000'))

    cleaned_pin_data_df = cleaned_pin_data_df.withColumnRenamed('index', 'ind')
    cleaned_pin_data_df = cleaned_pin_data_df.select('ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category')

    return cleaned_pin_data_df

In [0]:
pin_df_cleaned = clean_pin_data(stream_pin_df)
display(pin_df_cleaned)

In [0]:
from pyspark.sql.functions import array, col, to_timestamp

def cleaned_geo_df(stream_geo_df):
    clean_geo_df = stream_geo_df.withColumn('coordinates', array(col('longitude'), col('latitude')))
    clean_geo_df = clean_geo_df.drop('longitude', 'latitude')

    clean_geo_df = clean_geo_df.withColumn('timestamp', to_timestamp(col('timestamp')))
    clean_geo_df = clean_geo_df.select('ind', 'country', 'coordinates', 'timestamp')

    return clean_geo_df

In [0]:
cleaned_geo_df = cleaned_geo_df(stream_geo_df)
display(cleaned_geo_df)

In [0]:
from pyspark.sql.functions import concat

def cleaned_user_df(stream_user_df):
    clean_user_df = stream_user_df.withColumn('user_name', concat('first_name', 'last_name'))
    clean_user_df = clean_user_df.drop('first_name', 'last_name')
    clean_user_df = clean_user_df.withColumn('user_name', regexp_replace("user_name", "([a-z]) ([A-Z])", r"\1 \2")) 

    clean_user_df = clean_user_df.withColumn('date_joined', to_timestamp(col('date_joined')))
    clean_user_df = clean_user_df.select('ind' , 'user_name', 'age', 'date_joined')
    return clean_user_df

In [0]:
cleaned_user_stream = cleaned_user_df(stream_user_df)
display(cleaned_user_stream)

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

pin_df_cleaned.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0eaf46a0829f_pin_table")

In [0]:


cleaned_geo_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/cleaned_geo_df/") \
    .table("0eaf46a0829f_geo_table")

In [0]:


cleaned_user_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/cleaned_user_stream/") \
    .table("0eaf46a0829f_user_table")