# **Connecting to the Azure Data Lake Storage using OAuth Token**
> ### **Mounting the _source_ container from the ADLS**

In [0]:
client_id = dbutils.secrets.get(scope = "OAuth-Secret-Scope",key ="clientid")
client_secret = dbutils.secrets.get(scope = "OAuth-Secret-Scope",key ="clientsecret")
tenant_id = dbutils.secrets.get(scope = "OAuth-Secret-Scope",key ="tenantid")

configs = {
    "fs.azure.account.auth.type":"OAuth",
    "fs.azure.account.oauth.provider.type":"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id":client_id,
    "fs.azure.account.oauth2.client.secret":client_secret,
    "fs.azure.account.oauth2.client.endpoint":"https://login.microsoftonline.com/" + tenant_id +"/oauth2/token"
}

dbutils.fs.mount(
    source = 'abfss://source@spacexadls.dfs.core.windows.net/',
    mount_point = '/mnt/source',
    extra_configs= configs
)

## Implementing the medallion architecture by mounting _Bronze, Silver and Gold_ Layers

In [0]:
dbutils.fs.mount(
    source = 'abfss://bronze@spacexadls.dfs.core.windows.net/',
    mount_point = '/mnt/bronze',
    extra_configs= configs
)

dbutils.fs.mount(
    source = 'abfss://silver@spacexadls.dfs.core.windows.net/',
    mount_point = '/mnt/silver',
    extra_configs= configs
)

dbutils.fs.mount(
    source = 'abfss://gold@spacexadls.dfs.core.windows.net/',
    mount_point = '/mnt/gold',
    extra_configs= configs
)

# INGESTION OF RAW DATA

In [0]:
data = spark.read.csv("/mnt/source/ingested_data.csv",header=True,mode="DROPMALFORMED")

### Implementation of **_Bronze Layer_** in Medallion architecture

In [0]:
data.write.format("delta").mode("overwrite").save("/mnt/bronze/")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze"); 
data.write.format("delta").mode("overwrite").saveAsTable("bronze.ingested_data")

In [0]:
%sql
SELECT * FROM bronze.ingested_data;

In [0]:
df = spark.table('bronze.ingested_data')
display(df)

# TRANSFORMATION OF BRONZE LAYER DATA

### Dropping the _Price_ Column

In [0]:
df = df.drop('Price')
display(df)

### Dropping the null values from the _Time_ column

In [0]:
df = df.dropna(subset=["Time"])
display(df)

### Dropping the _Duplicate_ rows

In [0]:
df = df.dropDuplicates(subset=None)
display(df.count())


### Creating the new column to determine the Year of Launch and Dropping the Date Column

In [0]:
from pyspark.sql import functions as F
df = df.withColumn("year", F.year(F.col("Date")))
df = df.drop('Date')
display(df.printSchema())

### Removing the new line tag in the MissionStatus Column

In [0]:
df = df.withColumn("MissionStatus", F.element_at(F.split(df["MissionStatus"], "\n"), 1))

In [0]:
from pyspark.sql import functions as F

# Split the location by ", " and Exttract Country and City
df = df.withColumn("Country", F.element_at(F.split(df["Location"], ", "), -1))
df = df.withColumn("City",F.element_at(F.split(df["Location"], ", "), -2))

#drop the Location column
df = df.drop("Location")

df = df.withColumn("Country",F.when(df["Country"] == "USA", "United States").otherwise(df["Country"]))
# Preview cleaned locations
display(df)


### Implementation of **_Silver Layer_** in Medallion architecture

In [0]:
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save("/mnt/silver/")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS silver");
df.write.format("delta").option("mergeSchema","true").mode("overwrite").saveAsTable("silver.transformed_data")

# Final Transformation and Visualization of the transformed data 

In [0]:
df = spark.table('silver.transformed_data')
display(df)

In [0]:
display(df.groupBy("Company").count().show(100))

In [0]:
companies_to_exclude = [
    "UT", "CNES", "KCST", "Landspace", "SRC", "ISA", "Douglas", "CAS Space",
    "ASI", "CECLES", "Virgin Galactic", "IRGC", "JAXA", "KARI", "EER", "Armï¿½e de l'Air",
    "i-Space", "OneSpace", "Galactic Energy", "Firefly", "IAI", "CASIC", "Khrunichev",
    "MITT", "AMBA", "AEB", "Virgin Orbit", "Land Launch", "RAE", "OKB-586", "Yuzhmash",
    "GK LS", "Astra", "Exos", "Eurockot","Starsem","US Navy","ESA","ExPace","Blue Origin",
    "Kosmotras","Rocket Lab","ISAS","Sea Launch","ILS","Roscosmos","Lockheed","MHI","Northrop","RVSN USSR","CASC"
]

df = df.filter(~df["Company"].isin(companies_to_exclude))
display(df.groupBy("Company").count())



In [0]:
df.write.format("delta").mode("overwrite").save("/mnt/gold/target/")

In [0]:
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("gold.target_data")

In [0]:
display(df)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Step 1: Filter SpaceX launches
space_x_data = df.filter(df["Company"] == "SpaceX")

# Step 2: Group by Rocket and calculate total + successful launches
grouped_by_rocket = space_x_data.groupBy("Rocket","Country","City").agg(
    F.count("*").alias("Total_Launches"),
    F.count(F.when(F.col("MissionStatus") == "Success", True)).alias("Successful_Launches")
)

# Step 3: Calculate success rate
grouped_by_rocket = grouped_by_rocket.withColumn(
    "Success_Rate",
    (F.col("Successful_Launches") / F.col("Total_Launches"))*100
)

display(grouped_by_rocket.printSchema())

In [0]:
grouped_by_rocket.write.format("delta").mode("overwrite").save("/mnt/gold/space_x_rockets")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
grouped_by_rocket.write.format("delta").mode("overwrite").saveAsTable("gold.space_x_rockets")

In [0]:
space_x_rockets = spark.table("gold.space_x_rockets")
display(space_x_rockets)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.