In [0]:
from pyspark.sql.functions import to_date, col

### Read from bronze (landing) layer

In [0]:
# account for landing files from https
storage_account_name = "factoredatathon"
storage_account_key = dbutils.secrets.get(scope="events", key="landingBlobKey")
container_name = "bronze"

spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    f"{storage_account_key}"
)

In [0]:
# read from bronze
file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/*.CSV"
df = spark.read.option("header", "false").option("delimiter", "\t").csv(file_path)

columns = [
    "GLOBALEVENTID", "SQLDATE", "MonthYear", "Year", "FractionDate",
    "Actor1Code", "Actor1Name", "Actor1CountryCode", "Actor1KnownGroupCode",
    "Actor1EthnicCode", "Actor1Religion1Code", "Actor1Religion2Code",
    "Actor1Type1Code", "Actor1Type2Code", "Actor1Type3Code", "Actor2Code",
    "Actor2Name", "Actor2CountryCode", "Actor2KnownGroupCode",
    "Actor2EthnicCode", "Actor2Religion1Code", "Actor2Religion2Code",
    "Actor2Type1Code", "Actor2Type2Code", "Actor2Type3Code", "IsRootEvent",
    "EventCode", "EventBaseCode", "EventRootCode", "QuadClass",
    "GoldsteinScale", "NumMentions", "NumSources", "NumArticles", "AvgTone",
    "Actor1Geo_Type", "Actor1Geo_FullName", "Actor1Geo_CountryCode",
    "Actor1Geo_ADM1Code", "Actor1Geo_Lat", "Actor1Geo_Long",
    "Actor1Geo_FeatureID", "Actor2Geo_Type", "Actor2Geo_FullName",
    "Actor2Geo_CountryCode", "Actor2Geo_ADM1Code", "Actor2Geo_Lat",
    "Actor2Geo_Long", "Actor2Geo_FeatureID", "ActionGeo_Type",
    "ActionGeo_FullName", "ActionGeo_CountryCode", "ActionGeo_ADM1Code",
    "ActionGeo_Lat", "ActionGeo_Long", "ActionGeo_FeatureID", "DATEADDED",
    "SOURCEURL"
]

df = df.toDF(*columns)

In [0]:
# filter df to 50 countries of the database

server = "factoredata2024.database.windows.net"
db = "dactoredata2024"
user = "factoredata2024admin"
password = dbutils.secrets.get(scope="events", key="ASQLPassword")

# JDBC connection properties
jdbc_url = f"jdbc:sqlserver://{server}:1433;database={db};user={user}@{db};password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

connection_properties = {
    "user": f"{user}@{server}",
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Table name in Azure SQL Database
table_name = "[gkg].[50countries]"

# Write DataFrame to Azure SQL Database
# count_df.write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=connection_properties)

# Read data from Azure SQL Database into DataFrame
countries50 = spark.read \
    .jdbc(url=jdbc_url, table=table_name, properties=connection_properties)

In [0]:
# Collect the column from the smaller DataFrame as a list
filter_values = countries50.select('countryCode').rdd.flatMap(lambda x: x).collect()

# Filter the large DataFrame using the collected list
df = df \
    .filter(df['ActionGeo_CountryCode'].isin(filter_values))

Here we write into silver layer

In [0]:
# Convert 'YYYYMMDD' string to a date format
df = df.withColumn("DATE", to_date(col("SQLDATE"), "yyyyMMdd"))
# Filter rows where the date is greater than '2023-08-13'
df = df.filter(col("DATE") > '2023-08-13')

# Define the path where you want to save the Delta file in DBFS
delta_path = "/mnt/silver/eventsSilver"
# Write the DataFrame as a Delta file
df = df.repartition(32)
df.write.format("delta").mode("overwrite").save(delta_path)