In [0]:
import urllib
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType

# Define a function to read AWS credentials from Delta table
def read_aws_credentials_from_delta(delta_table_path):
    aws_keys_df = spark.read.format("delta").load(delta_table_path)
    ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
    SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
    return ACCESS_KEY, SECRET_KEY

# Define a function to read AWS credentials from CSV file
def read_aws_credentials_from_csv(csv_file_path):
    file_type = "csv"
    first_row_is_header = "true"
    delimiter = ","
    
    csv_schema = StructType([
        StructField("User name", StringType()),
        StructField("Access key ID", StringType()),
        StructField("Secret access key", StringType())
    ])

    aws_keys_df = spark.read.format(file_type)\
        .option("header", first_row_is_header)\
        .option("sep", delimiter)\
        .schema(csv_schema)\
        .load(csv_file_path)

    ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
    SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
    return ACCESS_KEY, SECRET_KEY

# Choose either Delta table or CSV file method based on where your credentials are stored
use_delta_table = True  # Set this to True if using Delta table, False if using CSV file

if use_delta_table:
    # Read AWS credentials from Delta table
    delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
    try:
        ACCESS_KEY, SECRET_KEY = read_aws_credentials_from_delta(delta_table_path)
    except Exception as e:
        print(f"Error reading AWS credentials from Delta table: {str(e)}")
        # Handle the error as needed, e.g., exit the script or set default values
else:
    # Read AWS credentials from CSV file
    csv_file_path = "/FileStore/tables/authentication_credentials.csv"
    try:
        ACCESS_KEY, SECRET_KEY = read_aws_credentials_from_csv(csv_file_path)
    except Exception as e:
        print(f"Error reading AWS credentials from CSV file: {str(e)}")
        # Handle the error as needed, e.g., exit the script or set default values

# Define a function to read data from Kinesis stream using structured streaming
def read_kinesis_stream(stream_name, region_name, schema):
    return spark.readStream \
        .format("kinesis") \
        .option("streamName", stream_name) \
        .option("regionName", region_name) \
        .option("initialPosition", "TRIM_HORIZON") \
        .option("format", "json") \
        .option("awsAccessKey", ACCESS_KEY) \
        .option("awsSecretKey", SECRET_KEY) \
        .option("inferSchema", "true") \
        .load() \
        .withColumn("cast_data", from_json(col("data").cast("string"), schema)) \
        .select("cast_data.*")

# Define the schema for the pin stream
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# Define the schema for the geo stream
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

# Define the schema for the user stream
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

# Read data from Kinesis streams
streaming_df_pin = read_kinesis_stream("streaming-0e6999790cc9-pin", "us-east-1", pin_schema)
streaming_df_geo = read_kinesis_stream("streaming-0e6999790cc9-geo", "us-east-1", geo_schema)
streaming_df_user = read_kinesis_stream("streaming-0e6999790cc9-user", "us-east-1", user_schema)

# Display the DataFrames
display(streaming_df_pin)
display(streaming_df_geo)
display(streaming_df_user)




ind,timestamp,latitude,longitude,country


In [0]:
# Cleaning and Transforming the Pin Stream:
from pyspark.sql.functions import regexp_replace, when, col, lit

# Cleaning the description column
streaming_df_pin = streaming_df_pin.withColumn(
    "description",
    when(
        (streaming_df_pin.description.isin("Untitled", "No description available", "No description available Story format")),
        "None"
    ).otherwise(streaming_df_pin.description)
)

# Cleaning the tag_list column
streaming_df_pin = streaming_df_pin.withColumn(
    "tag_list",
    when(
        (streaming_df_pin.tag_list.isin("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "0")),
        "None"
    ).otherwise(streaming_df_pin.tag_list)
)

# Cleaning the title column
streaming_df_pin = streaming_df_pin.withColumn(
    "title",
    when(
        (streaming_df_pin.title == "No Title Data Available"),
        "None"
    ).otherwise(streaming_df_pin.title)
)

# Cleaning the follower_count column
streaming_df_pin = streaming_df_pin.withColumn(
    "follower_count",
    when(
        (streaming_df_pin.follower_count == "User Info Error"),
        "0"
    ).otherwise(streaming_df_pin.follower_count)
)

# Cleaning the image_src column
streaming_df_pin = streaming_df_pin.withColumn(
    "image_src",
    when(
        (streaming_df_pin.image_src == "Image src error."),
        "None"
    ).otherwise(streaming_df_pin.image_src)
)

# Cleaning the poster_name column
streaming_df_pin = streaming_df_pin.withColumn(
    "poster_name",
    when(
        (streaming_df_pin.poster_name == "User Info Error"),
        "None"
    ).otherwise(streaming_df_pin.poster_name)
)

# Cleaning the downloaded column
streaming_df_pin = streaming_df_pin.withColumn(
    "downloaded",
    when(
        (streaming_df_pin.downloaded == "None"),
        "0"
    ).otherwise(streaming_df_pin.downloaded)
)

# Replace 'k' and 'M' in follower_count, and change datatype to int
streaming_df_pin = streaming_df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000")) \
    .withColumn("follower_count", regexp_replace("follower_count", "M", "000000")) \
    .withColumn("follower_count", col("follower_count").cast("int"))

# Rename 'index' to 'ind'
streaming_df_pin = streaming_df_pin.withColumnRenamed("index", "ind")

# Remove "Local save in" from save_location
streaming_df_pin = streaming_df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Select the desired columns
streaming_df_pin = streaming_df_pin.select(
    col("ind"), col("unique_id"), col("title"), col("description"),
    col("follower_count"), col("poster_name"), col("tag_list"),
    col("is_image_or_video"), col("image_src"), col("save_location"), col("category")
)

display(streaming_df_pin)


# Cleaning and Transforming the Geo Stream:
from pyspark.sql.functions import array

# Combine latitude and longitude into a coordinates array
streaming_df_geo = streaming_df_geo.withColumn("coordinates", array("latitude", "longitude")).drop("latitude", "longitude")

# Change the datatype of the "timestamp" column to timestamp
streaming_df_geo = streaming_df_geo.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Select the desired columns
streaming_df_geo = streaming_df_geo.select(col("ind"), col("country"), col("coordinates"), col("timestamp"))

display(streaming_df_geo)

# Cleaning and Transforming the User Stream:
from pyspark.sql.functions import concat, lit

# Combine first_name and last_name into a user_name column
streaming_df_user = streaming_df_user.withColumn('user_name', concat(streaming_df_user.first_name, lit(' '), streaming_df_user.last_name)).drop("first_name", "last_name")

# Change the datatype of the "date_joined" column to timestamp
streaming_df_user = streaming_df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))

# Select the desired columns
streaming_df_user = streaming_df_user.select(col("ind"), col("user_name"), col("age"), col("date_joined"))

display(streaming_df_user)



ind,user_name,age,date_joined


In [0]:
#Save the cleaned and transformed streaming DataFrames as Delta Tables

# Save the Pin Stream
streaming_df_pin.writeStream.format("memory").queryName("pin_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_pin = spark.sql("SELECT * FROM pin_temp_table")

# Save the non-streaming DataFrame as a Delta Table
not_streaming_df_pin.write.format("delta").mode("overwrite").saveAsTable("0e6999790cc9_pin_table")

# Display the non-streaming DataFrame
display(not_streaming_df_pin)

# Save the Geo Stream
streaming_df_geo.writeStream.format("memory").queryName("geo_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_geo = spark.sql("SELECT * FROM geo_temp_table")

# Save the non-streaming DataFrame as a Delta Table
not_streaming_df_geo.write.format("delta").mode("overwrite").saveAsTable("0e6999790cc9_geo_table")

# Display the non-streaming DataFrame
display(not_streaming_df_geo)

# Save the User Stream
streaming_df_user.writeStream.format("memory").queryName("user_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_user = spark.sql("SELECT * FROM user_temp_table")

# Save the non-streaming DataFrame as a Delta Table
not_streaming_df_user.write.format("delta").mode("overwrite").saveAsTable("0e6999790cc9_user_table")

# Display the non-streaming DataFrame
display(not_streaming_df_user)


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category


ind,country,coordinates,timestamp


ind,user_name,age,date_joined
