In [0]:
client_id=dbutils.secrets.get(scope="masterclass",key="client")
client_secret=dbutils.secrets.get(scope="masterclass",key="clientsecret")
tenant_id=dbutils.secrets.get(scope="masterclass",key="tenant")
temps= {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id":client_id,
        "fs.azure.account.oauth2.client.secret": client_secret,
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+tenant_id+"/oauth2/token"
    }
dbutils.fs.mount(
        source = "abfss://demo@data1007.dfs.core.windows.net/",
        mount_point = "/mnt/demo",
        extra_configs= temps
)


In [0]:
dbutils.fs.mount(
    source = 'abfss://bronze@data1007.dfs.core.windows.net/',
    mount_point = '/mnt/bronze',
    extra_configs= temps
)

dbutils.fs.mount(
    source = 'abfss://silver@data1007.dfs.core.windows.net/',
    mount_point = '/mnt/silver',
    extra_configs= temps
)

dbutils.fs.mount(
    source = 'abfss://gold@data1007.dfs.core.windows.net/',
    mount_point = '/mnt/gold',
    extra_configs= temps
)

In [0]:
data=spark.read.csv("/mnt/demo/ingested_data.csv",header=True,inferSchema=True)

In [0]:
display(data)

In [0]:
data.write.format("delta").mode("overwrite").save("/mnt/bronze")

In [0]:
spark.sql("create schema if not exists bronze");
data.write.format("delta").mode("overwrite").saveAsTable("bronze.raw_data")


In [0]:
data=spark.table("bronze.raw_data")
display(data)

In [0]:
data=data.dropDuplicates()
display(data)

In [0]:
data = data.dropna(subset=["Time"])
display(data)


In [0]:
data = data.drop("Price")
display(data)


In [0]:
%python
from pyspark.sql.functions import to_date, col, hour, when
from pyspark.sql import functions as F

# removing the new line tag(/n)
data = data.withColumn("MissionStatus", F.trim(col("MissionStatus")))

# Converting the Date column into date format
data = data.withColumn("Date", to_date(col("Date")))


# Extracting hour from Time column and creating a new column TimeOfDay
data = data.withColumn("Hour", hour(col("Time")))

# Creating a new column TimeOfDay based on the hour
data = data.withColumn(
    "TimeOfDay",
    when(col("Hour").between(0, 11), "Morning")
    .when(col("Hour").between(12, 16), "Afternoon")
    .otherwise("Night")
)

# Dropping the intermediate Hour column
data = data.drop("Hour")

display(data)

In [0]:
data.write.format("delta").mode("overwrite").save("/mnt/silver")

In [0]:
spark.sql("create schema if not exists silver");
data.write.format("delta").mode("overwrite").saveAsTable("silver.cleansed_data")


In [0]:
data=spark.table("silver.cleansed_data")

In [0]:
from pyspark.sql.functions import split, size, element_at



# Split the location by ", " and take the last part (assumed to be country)

data= data.withColumn("Country", element_at(split(data["Location"], ", "), -1))



# If needed, overwrite the Location column

data= data.withColumn("Location", data["Country"]).drop("Country")



# Preview cleaned locations

display(data.select("Location").distinct().limit(20))

In [0]:
from pyspark.sql import functions as F

# Split the location by ", " and take the last part (assumed to be country)
data = data.withColumn("Country", F.element_at(F.split(data["Location"], ", "), -1))
data = data.withColumn("City",F.element_at(F.split(data["Location"], ", "), -2))

# If needed, overwrite the Location column
data= data.withColumn("Location", data["Country"]).drop("Country")

data = data.withColumn("Location",F.when(data["Location"] == "USA", "United States").otherwise(data["Location"]))
# Preview cleaned locations
display(data)

In [0]:
from pyspark.sql import functions as F
data = data.withColumn("year", F.year(F.col("Date")))
data = data.withColumn("month", F.month(F.col("Date")))
data = data.withColumn("day", F.dayofmonth(F.col("Date")))
display(data)

In [0]:
data = data.drop("Date")
display(data)


In [0]:
from pyspark.sql.functions import col

# Define the list of companies to exclude
companies_to_exclude = [
    "UT", "CNES", "KCST", "Landspace", "SRC", "ISA", "Douglas", "CAS Space",
    "ASI", "CECLES", "Virgin Galactic", "IRGC", "JAXA", "KARI", "EER", "Arm�e de l'Air",
    "i-Space", "OneSpace", "Galactic Energy", "Firefly", "IAI", "CASIC", "Khrunichev",
    "MITT", "AMBA", "AEB", "Virgin Orbit", "Land Launch", "RAE", "OKB-586", "Yuzhmash",
    "GK LS", "Astra", "Exos", "Eurockot", "Starsem", "ESA", "ExPace", "Blue Origin",
    "Kosmotras", "Rocket Lab", "ISAS", "Sea Launch", "ILS", "Roscosmos", "Lockheed", "MHI",
]

# Apply the filter to exclude companies and group the results
filtered_data = (
    data.filter(~col("Company").isin(companies_to_exclude))
      .groupBy("Company")
      .count()
)

# Display the result
display(filtered_data)


In [0]:
display(data)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import col

# Top 10 Rockets by Total Launch Count
top_rockets = data.groupBy("Rocket").count().orderBy(col("count").desc()).limit(20)
top_rockets_list = [row["Rocket"] for row in top_rockets.collect()]

# Filter only those rockets from the main data
filtered_data = data.filter(col("Rocket").isin(top_rockets_list))

# Group and Display
filtered_data.groupBy("Rocket", "MissionStatus").count().display()


Databricks visualization. Run in Databricks to view.

In [0]:
%python
from pyspark.sql import functions as F

# Step 1: Filter SpaceX launches
space_x_data = data.filter(data["Company"] == "SpaceX")

# Step 2: Group by Rocket and calculate total + successful launches
grouped_by_rocket = space_x_data.groupBy("Rocket", "Location","City").agg(
    F.count("*").alias("Total_Launches"),
    F.count(F.when(F.col("MissionStatus") == "Success", True)).alias("Successful_Launches")
)

# Step 3: Calculate success rate
grouped_by_rocket = grouped_by_rocket.withColumn(
    "Success_Rate",
    (F.col("Successful_Launches") / F.col("Total_Launches")) * 100
)

display(grouped_by_rocket)

Databricks visualization. Run in Databricks to view.

In [0]:
+