In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "client.id",
"fs.azure.account.oauth2.client.secret": 'secret',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/-----/oauth2/token"}

dbutils.fs.unmount("/mnt/exporteddatast")
# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://exporteddata@projectstorageaccount099.dfs.core.windows.net/",
  mount_point = "/mnt/exporteddatast",
  extra_configs = configs)  

/mnt/exporteddatast has been unmounted.


True

In [0]:
dbutils.fs.ls("/mnt/exporteddatast/")


[FileInfo(path='dbfs:/mnt/exporteddatast/dbo/', name='dbo/', size=0, modificationTime=1726880526000),
 FileInfo(path='dbfs:/mnt/exporteddatast/output/', name='output/', size=0, modificationTime=1726963584000)]

In [0]:
cars_df = spark.read.format("parquet").load("/mnt/exporteddata/dbo/Cars")
countries_df = spark.read.format("parquet").load("/mnt/exporteddata/dbo/Countries")
movies_df = spark.read.format("parquet").load("/mnt/exporteddata/dbo/Movies")
                                        



In [0]:
display(cars_df)
display(countries_df)
display(movies_df)


ID,Name,Model,Length
1,Acura,MDX,4451
2,Acura,RSX Type S 2dr,2778
3,Acura,TSX 4dr,3230
4,Acura,TL 4dr,3575
5,Acura,3.5 RL 4dr,3880
6,Acura,3.5 RL w/Navigation 4dr,3893
7,Acura,NSX coupe 2dr manual S,3153
8,Audi,A4 1.8T 4dr,3252
9,Audi,A41.8T convertible 2dr,3638
10,Audi,A4 3.0 4dr,3462


ID,Name
1,Australia
2,Austria
3,Brazil
4,Canada
5,China
6,Czech Republic
7,France
8,Germany
9,Indonesia
10,Israel


ID,Title,Genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
# Drop rows with nulls in critical columns
cleaned_cars_df = cars_df.na.drop(subset=["Name", "Model"])
cleaned_movies_df = movies_df.na.drop(subset=["Title", "Genres"])

cleaned_countries_df = countries_df.na.drop(subset=["Name"])


In [0]:
# Fill missing 'Length' with 0
cars_df = cars_df.na.fill({'Length': 0})

# Fill missing 'Genres' with 'Unknown'
movies_df = movies_df.na.fill({'Genres': 'Unknown'})

# Fill missing 'Name' in Countries with 'Unknown'
countries_df = countries_df.na.fill({'Name': 'Unknown'})


In [0]:
# Cast Length to integer type if it's not already
cars_df = cars_df.withColumn("Length", cars_df["Length"].cast("int"))


In [0]:
# Remove duplicates from cars_df based on 'Name' and 'Model'
cars_df = cars_df.dropDuplicates(["Name", "Model"])

# Remove duplicates from movies_df based on 'Title'
movies_df = movies_df.dropDuplicates(["Title"])


In [0]:
# Rename 'ID' to 'Car_ID' in cars_df for clarity
cars_df = cars_df.withColumnRenamed("ID", "Car_ID")

# Rename 'ID' to 'Country_ID' in countries_df
countries_df = countries_df.withColumnRenamed("ID", "Country_ID")

# Rename 'ID' to 'Movie_ID' in movies_df
movies_df = movies_df.withColumnRenamed("ID", "Movie_ID")


In [0]:
# Filter out cars with Length equal to 0
filtered_cars_df = cars_df.filter(cars_df["Length"] > 0)

# Filter out movies with no genre
filtered_movies_df = movies_df.filter(movies_df["Genres"] != 'Unknown')


In [0]:
from pyspark.sql.functions import when

# Add a column for car weight category
cars_df = cars_df.withColumn(
    "Weight_Category", 
    when(cars_df["Length"] > 4000, "Heavy").otherwise("Light")
)


In [0]:
from pyspark.sql.functions import upper

# Convert movie titles to uppercase
movies_df = movies_df.withColumn("Title", upper(movies_df["Title"]))

# Trim extra spaces from the Name column in cars
from pyspark.sql.functions import trim

cars_df = cars_df.withColumn("Name", trim(cars_df["Name"]))


In [0]:
from pyspark.sql.functions import count

# Count the number of cars for each brand
car_count_by_brand = cars_df.groupBy("Name").agg(count("Car_ID").alias("Car_Count"))


In [0]:
cars_df.repartition(1).write.mode("overwrite").option("header", "true").csv("dbfs:/mnt/exporteddatast/output/cars")
