In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *


In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
#Connecting to Azure sql server 


jdbcHostname = "server_name.database.windows.net"
jdbcPort = 1433
jdbcDatabase = ""
jdbcUsername = ""
jdbcPass = ""
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"


jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPass}"

In [0]:
# Reading Table Data
def read_tables(url,table_name):
    df = spark.read.format("jdbc") \
        .option("url",url) \
        .option("dbtable",table_name).load()
    return df

In [0]:
table_list = ["CountryCodeQS","County_Population","enigmaJHU","states_abv","states_daily","us_county","us_daily","us_population_total_test","us_states","usa_hospital_beds"]

CountryCodeQS = read_tables(jdbcUrl,"exe.CountryCodeQS")
County_Population = read_tables(jdbcUrl,"exe.County_Population")
enigmaJHU = read_tables(jdbcUrl,"exe.enigmaJHU")
states_abv = read_tables(jdbcUrl,"exe.states_abv")
states_daily = read_tables(jdbcUrl,"exe.states_daily")
us_county = read_tables(jdbcUrl,"exe.us_county")
us_daily = read_tables(jdbcUrl,"exe.us_daily")
us_population_total_test = read_tables(jdbcUrl,"exe.us_population_total_test")
us_states = read_tables(jdbcUrl,"exe.us_states")
usa_hospital_beds = read_tables(jdbcUrl,"exe.usa_hospital_beds")

In [0]:
display(enigmaJHU)

In [0]:
#connect usinf JDBC connetion string  - ANother method

conn_test = "jdbc:sqlserver://server.database.windows.net:1433;database=Database;user=user;password={paass};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

df = spark.read.jdbc(url = conn_test,table = "exe.states_abv")
display(df)

In [0]:
# create fact Tabale
fact_covid_temp1 = enigmaJHU.select('fips','province_state','country_region','confirmed','deaths','recovered','active')
fact_covid_temp2 = states_daily.select(col('fips').alias("fips_1"),'date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged')
fact_covid = fact_covid_temp1.join(fact_covid_temp2,fact_covid_temp1['fips']==fact_covid_temp2['fips_1'],'left')


In [0]:
#Create Region Table

dim_region_1 = enigmaJHU.select('fips','province_state','country_region','latitude','longitude')
dim_region_2 = us_county.select(col('fips').alias("fips_1"),'county','state')
dim_region = dim_region_1.join(dim_region_2,dim_region_1['fips']==dim_region_2['fips_1'],'inner')

In [0]:
#creating time dimension table
dim_Date_t = states_daily.select('fips',col('date').cast(StringType()))
dim_Date_temp = dim_Date_t.select("fips",expr('to_date(date, "yyyyMMdd") as date'))
dim_Date = dim_Date_temp.select("fips","date",year(col("date")).alias("year"),month(col("date")).alias("month"))

In [0]:
#storing Dataframes to Azure Storage

#creating mont point
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/iotdata",
  extra_configs = {"fs.azure.account.key.<storage-account-name>.blob.core.windows.net":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})

In [0]:
#Write data back to Azure storage 
fact_covid.write.format("csv") \
    .mode("Overwrite") \
    .option("path","mnt/Covid_data/fact_covid") \
    .save()

dim_region.write.format("csv") \
    .mode("Overwrite") \
    .option("path","mnt/Covid_data/dim_region") \
    .save()

dim_Date.write.format("csv") \
    .mode("Overwrite") \
    .option("path","mnt/Covid_data/dim_Date") \
    .save()