### Import

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StringType, StructType, TimestampType, IntegerType
import urllib



### Reading the Kinesis streams

In [0]:

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

Reading the credential file and extractin the keys needed

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")




Create a function that reads the  kinesis data

In [0]:
def read_kinesis_data(stream_name, json_schema):

    ''' The function utilizes `spark` to read the Kinesis stream. It proceeds by deserializing the data, which involves:
- Retrieving the 'data' field from the Kinesis stream and converting it into a string.
- Parsing the 'data' field based on the provided JSON schema.
- Selecting all columns from the parsed data.

Args:
    stream_name (`string`): The name of the stream from which data is to be streamed.
    json_schema (`StructType`): The schema to be applied to the output dataframe.

Returns:
    df (`pyspark.sql.DataFrame`): A DataFrame with the specified schema and alias.
'''

 # Creating the kinesis_stream dataframe
    kinesis_stream = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    # deserialising the data
    df = kinesis_stream \
    .selectExpr("CAST(data as STRING)") \ 
    .withColumn("data", from_json(col("data"), json_schema)) \ 
    .select(col("data.*")) 
    return df


### Writing to Delta Tables


A function is created to write transformed data to delta tables.

In [0]:
def write_kinesis_data(table_name, df):
    ''' A function that writes data to the specified delta table, it also 
    checkpoints the data incase we need to roll it back to an older state.

    Args:
        table_name (`string`): The desired name for the delta table
        df (`pyspark.sql.DataFrame`): 
    
    '''

    df.writeStream \ 
    .format("delta") \
    .outputMode("append") \ 
    .option("checkpointLocation", f"/tmp/kinesis/{table_name}_checkpoints/") \ 
    .table(table_name)


### Cleaning
Read the pin streaming data,clean and upload to appropriate delta table
- stream_name = '<name_of_kinesis_stream>'
- table_name = '<desired_table_name>'

In [0]:
# defining the stream name
stream_name =''

json_schema = StructType([
    StructField('index', IntegerType()),
    StructField('unique_id', StringType()),
    StructField('title', StringType()),
    StructField('description', StringType()),
    StructField('follower_count', StringType()),
    StructField('poster_name', StringType()),
    StructField('tag_list', StringType()),
    StructField('is_image_or_video', StringType()),
    StructField('image_src', StringType()),
    StructField('save_location', StringType()),
    StructField('category', StringType())
])

    
    # Creating the kinesis_stream dataframe
    df_pin = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    # deserialising the data
    df_pin = df_pin \
        .selectExpr("CAST(data as STRING)") \
        .withColumn("data", from_json(col("data"), json_schema)) \
        .select(col("data.*")) 

    return df_pin

df_pin = read_kinesis_data(stream_name,json_schema)

# replacing invalid entries with `None`, it is best to define a dictionary 
# here as this makes the whole process more scalable. 

col_and_entries_to_replace = {
    'title': 'No Title Data Available',
    'description': 'No description available Story format',
    'follower_count': 'User Info Error',
    'poster_name': 'User Info Error',
    'image_src': 'Image src error.',
    'tag_list': 'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e'
}


# Iterate over the dictionary and perform replacements
for column, value in col_and_entries_to_replace.items():
    df_pin = df_pin.withColumn(column, when(df_pin[column] == value, None).otherwise(df_pin[column]))

# replacing 'k' and 'M' with '000' and '000000' respectively.
df_pin = df_pin.withColumn('follower_count', 
                                           when(df_pin_cleaned.follower_count.endswith('k'), regexp_replace(df_pin_cleaned.follower_count, 'k', '000'))
                                           .when(df_pin_cleaned.follower_count.endswith('M'), regexp_replace(df_pin_cleaned.follower_count, 'M', '000000'))
                                           .otherwise(df_pin_cleaned.follower_count))

# casting follower count to integers
df_pin = df_pin.withColumn('follower_count', df_pin.follower_count.cast('int'))

# making the 'save_location' column show the path
df_pin = df_pin.withColumn('save_location', regexp_replace(df_pin.save_location, 'Local save in ', ''))

# defining the delta table name.
table_name = ''

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_pin)




Read the geo streaming data,clean and upload to appropriate delta table
- stream_name = '<name_of_kinesis_stream>'
- table_name = '<desired_table_name>'

In [0]:

json_schema = StructType([
    StructField('ind', IntegerType()),
    StructField('country', StringType()),
    StructField('latitude', StringType()),
    StructField('longitude', StringType()),
    StructField('timestamp', TimestampType())])

    
    # Creating the kinesis_stream dataframe
    df_geo = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    # deserialising the data
    df_geo = df_geo \
        .selectExpr("CAST(data as STRING)") \
        .withColumn("data", from_json(col("data"), json_schema)) \
        .select(col("data.*")) 

    return df_geo

df_geo = read_kinesis_data(stream_name.json_schema)


# creating a new column called 'coordinates'
df_geo = df_geo.withColumn('coordinates', array(df_geo.latitude, df_geo.longitude))

# dropping the columns 'latitude' and 'longitude'
df_geo = df_geo.drop(*['latitude', 'longitude'])

# converting 'timestamp' to a timestamp data type.
df_geo = df_geo.withColumn('timestamp', to_timestamp(df_geo.timestamp))

# defining the delta table name.
table_name = ''

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_geo)



Read the user streaming data,clean and upload to appropriate delta table
- stream_name = '<name_of_kinesis_stream>'
- table_name = '<desired_table_name>'

In [0]:

json_schema = StructType([
    StructField('ind', IntegerType()),
    StructField('first_name', StringType()),
    StructField('last_name', StringType()),
    StructField('age', IntegerType()),
    StructField('date_joined', TimestampType())])

    
    # Creating the kinesis_stream dataframe
    df_user = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    # deserialising the data
    df_user = df_user \
        .selectExpr("CAST(data as STRING)") \
        .withColumn("data", from_json(col("data"), json_schema)) \
        .select(col("data.*")) 

    return df_user

df_user = read_kinesis_data(stream_name)

# creating a new column called 'user_name'
df_user = df_user.withColumn('user_name', concat(df_user.first_name, lit(' '), df_user.last_name))

# dropping the 'first_name' and 'last_name' columns
df_user = df_user.drop(*['first_name', 'last_name'])

# converting 'date_joined' to a timestamp data type.
df_user = df_user.withColumn('date_joined', to_timestamp(df_user_cleaned.date_joined))

# reordering columns
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

# defining the delta table name.
table_name = ''

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_user)


