## Connecting ASW Kinesis to Databrick

In [None]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# URL processing
import urllib

# Define the parth
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

#read the path to spark dataframe
access_key_df = spark.read.format("delta").load(delta_table_path)

# Retriving the access key and secret keys
ACCESS_KEY =access_key_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY =access_key_df.select('Secret access key').collect()[0]['Secret access key']
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

### Creating Schema 

In [None]:
# schema for the pin table
schema_pin = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

In [None]:
# schema for the user table
schema_user = StructType([
    StructField("index", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", DateType())
])

In [None]:
#schema for geo table
schema_geo = StructType([
    StructField("index", IntegerType()),
    StructField("country", StringType()),
    StructField("timestamp", StringType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
])

## Reading Stream data from AWS Kinesis

In [None]:
# read pin data from kinesis using spark
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e4753f224a7-pin') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
# This section of code creates a new DataFrame df_pin from the existing df_pin DataFrame.
df_pin = df_pin \
    .withColumn("jsonData", df_pin["data"].cast("string")) \
    .withColumn("parsedJson", from_json("jsonData", schema_pin)) \
    .select("parsedJson.*")

display(df_pin)

## Cleaning of pin data

In [None]:

# Rename Index column to ind
df_pin = df_pin.withColumnRenamed('Index', 'ind')

# Selecting required columns
df_pin1 = df_pin.select(['ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category'])

# Clean follower_count and poster_name columns
df_pin1 = df_pin1.withColumn('follower_count', regexp_replace(col('follower_count'), 'k', ' '))
df_pin1 = df_pin1.withColumn('poster_name', regexp_replace(col('poster_name'), '[^a-zA-Z0-9]', ' '))

# Cast follower_count and ind columns to IntegerType
df_pin1 = df_pin1.withColumn('follower_count', col('follower_count').cast(IntegerType()))
df_pin1 = df_pin1.withColumn('ind', col('ind').cast(IntegerType()))

# Filter rows where poster_name is not 'User Info Error' and follower_count is not null
df_pin1 = df_pin1.filter((col('poster_name') != 'User Info Error') & (col('follower_count').isNotNull()))

# Adjust save_location column
df_pin1 = df_pin1.withColumn('save_location', col('save_location').substr(14, 100))

In [None]:
# Write the Pin data to DBFS
df_pin1.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e4753f224a7_pin_table")

In [None]:
# read geo data from kinesis using spark
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e4753f224a7-geo') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
# This section of code creates a new DataFrame df_geo from the existing df_geo DataFrame.

df_geo = df_geo \
    .withColumn("jsonData", df_geo["data"].cast("string")) \
    .withColumn("parsedJson", from_json("jsonData", schema_geo)) \
    .select("parsedJson.*")
display(df_geo)

## Cleaning of geo Stream data

In [None]:
# Create 'coordinates' column as an array of 'latitude' and 'longitude'
df_geo1 = df_geo.withColumn('coordinates', array(col('latitude'), col('longitude')))

# Drop 'latitude' and 'longitude' columns
df_geo1 = df_geo1.drop('latitude', 'longitude')

# Convert 'timestamp' column to timestamp type
df_geo1 = df_geo1.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss"))

# Rename 'index' column to 'ind'
df_geo1 = df_geo1.withColumnRenamed("index", 'ind')

# Cast 'ind' column to IntegerType
df_geo1 = df_geo1.withColumn('ind', col('ind').cast(IntegerType()))

# Select specific columns
df_geo1 = df_geo1.select('ind', 'country', 'coordinates', 'timestamp')

In [None]:
# Write the geo data to DBFS
df_geo1.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e4753f224a7_geo_tables")

In [None]:
# read user data from kinesis using spark
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e4753f224a7-user') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
# This section of code creates a new DataFrame df_user from the existing df_user DataFrame.

df_user = df_user \
    .withColumn("jsonData", df_user["Data"].cast("string")) \
    .withColumn("parsedJson", from_json("jsonData", schema=schema_user)) \
    .select("parsedJson.*")

## Cleaning of user stream data

In [None]:
# Concatenating 'first_name' and 'last_name' into a new column 'user_name'
df_user1 = df_user.withColumn('user_name', concat(df_user['first_name'], lit(' '), df_user['last_name']))

# Dropping 'first_name' and 'last_name' columns from the DataFrame
df_user1 = df_user1.drop('first_name', 'last_name')

# Formatting 'date_joined' column to "yyyy-MM-dd" format
df_user1 = df_user1.withColumn('date_joined', date_format(df_user1['date_joined'], "yyyy-MM-dd"))

# Converting formatted 'date_joined' column to DateType
df_user1 = df_user1.withColumn('date_joined', to_date(df_user1['date_joined']))

# Renaming 'index' column to 'ind'
df_user1 = df_user1.withColumnRenamed('index', 'ind')

# Casting 'ind' column to IntegerType
df_user1 = df_user1.withColumn('ind', df_user1['ind'].cast(IntegerType()))

# Casting 'age' column to IntegerType
df_user1 = df_user1.withColumn('age', df_user1['age'].cast(IntegerType()))

# Selecting specific columns ('ind', 'user_name', 'age', 'date_joined') from the DataFrame
df_user1 = df_user1.select('ind', 'user_name', 'age', 'date_joined')

In [None]:
# Write the user data to DBFS
df_user1.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e4753f224a7_user_table")