In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import boto3
import json
import random
import time
import urllib

In [0]:
# Step 1: Read in credentials from the Delta table
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
credentials_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the Spark DataFrame
ACCESS_KEY = credentials_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = credentials_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# COMMAND ----------

# Disable Delta format checks in SQL
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

In [0]:

# Step 2: Ingest data into Kinesis Data Streams
# Initialize Kinesis client
kinesis_client = boto3.client(
    'kinesis',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name='us-east-1'  # Replace with your region if needed
)

# Function to ingest data into Kinesis
def ingest_data_to_kinesis(stream_name, data):
    response = kinesis_client.put_record(
        StreamName=stream_name,
        Data=json.dumps(data),
        PartitionKey=str(random.randint(1, 1000))  # Use a random partition key
    )
    return response

# Example data to ingest
example_data_pin = {"example_field": "example_value_pin"}
example_data_geo = {"example_field": "example_value_geo"}
example_data_user = {"example_field": "example_value_user"}

# Ingest example data into Kinesis streams
ingest_data_to_kinesis("streaming-1226d593b7e7-pin", example_data_pin)
ingest_data_to_kinesis("streaming-1226d593b7e7-geo", example_data_geo)
ingest_data_to_kinesis("streaming-1226d593b7e7-user", example_data_user)

# Add a sleep to ensure data is ingested before reading (optional)
time.sleep(15)


In [0]:
# Step 3: Read the data from the Kinesis streams
def read_data_from_kinesis(stream_name):
    # Get the shard iterator
    shard_iterator_response = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId='shardId-000000000000',  # Replace with your shard ID
        ShardIteratorType='LATEST'  # Adjust as needed
    )
    shard_iterator = shard_iterator_response['ShardIterator']
    
    # Get records using the shard iterator
    response = kinesis_client.get_records(
        ShardIterator=shard_iterator,
        Limit=10  # Adjust as needed
    )
    return response['Records']

# Reading from the three streams
pin_data = read_data_from_kinesis("streaming-1226d593b7e7-pin")
geo_data = read_data_from_kinesis("streaming-1226d593b7e7-geo")
user_data = read_data_from_kinesis("streaming-1226d593b7e7-user")

# Print retrieved data
print("Pin Data:", pin_data)
print("Geo Data:", geo_data)
print("User Data:", user_data)

In [0]:
# Step 4: Read from Kinesis stream into Spark DataFrame
df_pin_data = spark \
.readStream \
.format('kinesis') \
.option('streamName', 'streaming-1226d593b7e7-pin') \
.option('initialPosition', 'earliest') \
.option('region', 'us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_pin_data)


partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
test,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644263058291322098633406716837890,2024-09-26T14:01:38.575+0000
test,eyJpbmRleCI6Mjg2MywidW5pcXVlX2lkIjoiOWJmMzk0MzctNDJhNi00ZjAyLTk5YTAtOWEwMzgzZDhjZDcwIiwidGl0bGUiOiIyNSBTdXBlciBGdW4gU3VtbWVyIENyYWZ0cyBmb3IgS2lkcyAtIE9mIExpZmUgYW5kIExpc2E= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644268178092168166588167755923458,2024-09-26T14:01:41.077+0000
test,eyJpbmRleCI6NTczMCwidW5pcXVlX2lkIjoiMWUxZjBjOGItOWZjZi00NjBiLTkxNTQtYzc3NTgyNzIwNmViIiwidGl0bGUiOiJJc2xhbmQgT2FzaXMgQ291cG9uIE9yZ2FuaXplciIsImRlc2NyaXB0aW9uIjoiRGVzY3JpcHQ= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644273140732657684641067363729410,2024-09-26T14:01:43.548+0000
test,eyJpbmRleCI6ODMwNCwidW5pcXVlX2lkIjoiNWI2ZDA5MTMtMjVlNC00M2FiLTgzOWQtODVkNTUxNmY3OGE0IiwidGl0bGUiOiJUaGUgIzEgUmVhc29uIFlvdeKAmXJlIE5vdCBIaXMgUHJpb3JpdHkgQW55bW9yZSAtIE1hdHQ= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644277605295709521466815712067586,2024-09-26T14:01:46.229+0000
test,eyJpbmRleCI6ODczMSwidW5pcXVlX2lkIjoiZWE3NjBmNzEtZmViZi00MDIzLWI1OTItZDE3Mzk2NjU5MDM5IiwidGl0bGUiOiIyMCBLb2kgRmlzaCBUYXR0b29zIEZvciBMdWNreSBNZW4iLCJkZXNjcmlwdGlvbiI6IktvaSA= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644280615521000361893598169399298,2024-09-26T14:01:47.958+0000
test,eyJpbmRleCI6MTMxMywidW5pcXVlX2lkIjoiNDQ2NjIwNDUtZTg5MS00ODIxLThhMTktZWJlN2VlZGQzNzFhIiwidGl0bGUiOiJMaXF1aWQgTGFzaCBFeHRlbnNpb25zIE1hc2NhcmEiLCJkZXNjcmlwdGlvbiI6Ikluc3RhbnQ= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644283347693352690955601724833794,2024-09-26T14:01:49.594+0000
test,eyJpbmRleCI6NDMxNSwidW5pcXVlX2lkIjoiMjFiNTliYTktODI5ZC00YzMzLThjMjctNGNkNGM1NmQyNmI4IiwidGl0bGUiOiJQb2RjYXN0cyBmb3IgVGVhY2hlcnMgb3IgUGFyZW50cyBvZiBUZWVuYWdlcnMiLCJkZXNjcmk= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644287471339323396455854086553602,2024-09-26T14:01:51.980+0000
test,eyJpbmRleCI6MTA3OTQsInVuaXF1ZV9pZCI6ImM0YmQyNTc3LWE3YmItNDQwOS1iYjdhLTE3ZDVlZDdlMWNmMSIsInRpdGxlIjoiVGlyZUJ1eWVyIiwiZGVzY3JpcHRpb24iOiJOaXNzYW4gR1QtUi4gU2ljay4iLCJwb3N0ZXI= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644290099544055238659817336733698,2024-09-26T14:01:53.453+0000
test,eyJpbmRleCI6NTQ5NCwidW5pcXVlX2lkIjoiOGZiMmFmNjgtNTQzYi00NjM5LTgxMTktZGUzM2QyODcwNmVkIiwidGl0bGUiOiJEYXZlIFJhbXNleSdzIDcgQmFieSBTdGVwczogV2hhdCBBcmUgVGhleSBBbmQgV2lsbCBUaGU= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644292512559991189459718769737730,2024-09-26T14:01:54.929+0000
test,eyJpbmRleCI6NTA2OSwidW5pcXVlX2lkIjoiYjc1YjZmODctZGViMy00NDRmLWIyOWUtY2U5MTYxYjJkZjQ5IiwidGl0bGUiOiJUaGUgVmF1bHQ6IEN1cmF0ZWQgJiBSZWZpbmVkIFdlZGRpbmcgSW5zcGlyYXRpb24iLCJkZXM= (truncated),streaming-1226d593b7e7-pin,shardId-000000000000,49656150928222836118193644295006573957054439843627532290,2024-09-26T14:01:56.364+0000


In [0]:
from pyspark.sql.functions import col

# Step 1: Read the existing Delta table and cast 'follower_count' to StringType
df_existing = spark.table("1226d593b7e7_pin_table") \
    .withColumn("follower_count", col("follower_count").cast("string"))

# Step 2: Write the corrected data to a temporary Delta table with the new schema
df_existing.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("1226d593b7e7_pin_table_temp")

# Step 3: Replace the old table with the new table (with the updated schema)
df_existing.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("1226d593b7e7_pin_table")

# Step 4: Ensure the checkpoint directory exists
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
dbutils.fs.mkdirs("/tmp/kinesis/_checkpoints/pin/")

# Step 5: Enable schema auto-merge in Spark config
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Step 6: Write the new stream with the updated schema
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("string"))

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin/") \
  .option("mergeSchema", "true") \
  .table("1226d593b7e7_pin_table")








In [0]:
# COMMAND ----------


print("Ingestion Response (Pin):", response_pin)
print("Ingestion Response (Geo):", response_geo)
print("Ingestion Response (User):", response_user)
