In [0]:
!pip install azure-storage-file-datalake
!pip install pycryptodome

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql.window import Window
from pyspark.sql.types import *
import json
import chardet
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeFileClient
from Crypto.Cipher import AES
import base64
import os
 

In [0]:
CONNECTION_STRING = dbutils.secrets.get(scope = 'az_cs_scope', key= 'conn-str')
ACCOUNT_KEY = dbutils.secrets.get(scope = 'az_cs_scope', key= 'acc-key')

In [0]:
ACCOUNT_NAME = "cs0st0rage"
 
# Initialize DataLakeServiceClient for Azure Data Lake Storage Gen2
data_lake_service_client = DataLakeServiceClient.from_connection_string(CONNECTION_STRING)
 
# Name of the filesystem (similar to container) where files are stored
file_system_name = "rawdata"
 
 
def download_and_parse_json(file_system_name, file_path):
    file_client = data_lake_service_client.get_file_system_client(file_system_name).get_file_client(file_path)
    try:
        # Download the JSON data as bytes
        file_data = file_client.download_file().readall()
        # Decode the bytes to a string and load it as JSON
        enc = chardet.detect(file_data)
        enc_format = enc.get("encoding", "utf-8") 
        print(enc_format)
        json_data = json.loads(file_data.decode(enc_format))
        return json_data
    except Exception as e:
        print(f"Failed to download and parse JSON: {e}")
        return None
 
if __name__ == "__main__":
    data = download_and_parse_json(file_system_name, "jsondata/data.json")

ascii


IOStream.flush timed out


In [0]:
records_restaurant = []
records_menu = []

for city_name, city_data in data.items():
    if isinstance(city_data, dict) and "restaurants" in city_data:
        full_city_name = city_name

        for restaurant_id, restaurant_data in city_data.get("restaurants", {}).items():
            records_restaurant.append(
                {
                    "Restaurant_ID": restaurant_id,
                    "Restaurant_Name": restaurant_data.get("name"),
                    "City": full_city_name,
                    "Rating": restaurant_data.get("rating"),
                    "Rating_Count": restaurant_data.get("rating_count"),
                    "Cost": restaurant_data.get("cost"),
                    "Cuisine": restaurant_data.get("cuisine"),
                    "Lic_No": restaurant_data.get("lic_no"),
                    "Link": restaurant_data.get("link"),
                    "Address": restaurant_data.get("address"),
                }
            )

            for category, items in restaurant_data.get("menu", {}).items():
                for item_name, item_data in items.items():
                    records_menu.append(
                        {
                            "Restaurant_ID": restaurant_id,
                            "Category": category,
                            "Item_Name": item_name,
                            "Price": item_data.get("price"),
                            "Veg_or_Non_Veg": item_data.get("veg_or_non_veg")
                        }
                    )

    elif isinstance(city_data, dict):
        for sub_area_name, sub_area_data in city_data.items():
            full_city_name = f"{sub_area_name},{city_name}"

            for restaurant_id, restaurant_data in sub_area_data.get(
                "restaurants", {}
            ).items():
                records_restaurant.append(
                    {
                        "Restaurant_ID": restaurant_id,
                        "Restaurant_Name": restaurant_data.get("name"),
                        "City": full_city_name,
                        "Rating": restaurant_data.get("rating"),
                        "Rating_Count": restaurant_data.get("rating_count"),
                        "Cost": restaurant_data.get("cost"),
                        "Cuisine": restaurant_data.get("cuisine"),
                        "Lic_No": restaurant_data.get("lic_no"),
                        "Link": restaurant_data.get("link"),
                        "Address": restaurant_data.get("address"),
                    }
                )

                for category, items in restaurant_data.get("menu", {}).items():
                    for item_name, item_data in items.items():
                        records_menu.append(
                            {
                                "Restaurant_ID": restaurant_id,
                                "Category": category,
                                "Item_Name": item_name,
                                "Price": item_data.get("price"),
                                "Veg_or_Non_Veg": item_data.get("veg_or_non_veg")
                            }
                        )

df_restaurant = spark.createDataFrame(records_restaurant)
df_menu = spark.createDataFrame(records_menu)
del data

In [0]:
# df_restaurant.display()
# df_menu.display()

#### Shape of Data

In [0]:
# print(df_restaurant.count())
# print(len(df_restaurant.columns))

In [0]:
# print(df_menu.count())
# print(len(df_menu.columns))

#### NULL Handling
  + Restaurant DF

In [0]:
null_counts = df_restaurant.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_restaurant.columns])
null_counts.show()

+-------+----+-----+-------+------+----+------+------------+-------------+---------------+
|Address|City| Cost|Cuisine|Lic_No|Link|Rating|Rating_Count|Restaurant_ID|Restaurant_Name|
+-------+----+-----+-------+------+----+------+------------+-------------+---------------+
|  25602|   0|25602|  25602| 25602|   0|     0|       25602|            0|              0|
+-------+----+-----+-------+------+----+------+------------+-------------+---------------+



In [0]:
df_restaurant = df_restaurant.dropna()

In [0]:
rows_with_na = df_restaurant.select([sum(when(col(c)=='NA',1).otherwise(0)).alias(c) for c in df_restaurant.columns])
rows_with_na.show()

+-------+----+----+-------+------+----+------+------------+-------------+---------------+
|Address|City|Cost|Cuisine|Lic_No|Link|Rating|Rating_Count|Restaurant_ID|Restaurant_Name|
+-------+----+----+-------+------+----+------+------------+-------------+---------------+
|    106|   0| 151|    119|   253|   0|   106|         106|            0|            106|
+-------+----+----+-------+------+----+------+------------+-------------+---------------+



In [0]:
df_restaurant=df_restaurant.filter(col('Restaurant_name')!='NA')

In [0]:
df_restaurant = df_restaurant.withColumn('Cost',regexp_replace('Cost','₹', ''))

In [0]:
city_medians = df_restaurant.filter(
    (col("Cost").isNotNull()) & ~(col("Cost") == "NA")
).groupBy("City").agg(
    expr('percentile_approx(Cost, 0.5)').alias('median_cost')
)
df_with_median = df_restaurant.join(
    city_medians, on="City", how="left"
)

df_restaurant = df_with_median.withColumn(
    "Cost",
    coalesce(col("Cost").cast("int"), col("median_cost"))
).drop("median_cost")


In [0]:
df_restaurant = df_restaurant.withColumn(
    "Lic_No", 
    when((col("Lic_No") == "NA") | (col("Lic_No") == "license"), "00000000000000")
    .otherwise(col("Lic_No"))
)

In [0]:
df_restaurant.select([count(when(col(column) == '--', 1)).alias(column) for column in df_restaurant.columns]).show()

+----+-------+----+-------+------+----+------+------------+-------------+---------------+
|City|Address|Cost|Cuisine|Lic_No|Link|Rating|Rating_Count|Restaurant_ID|Restaurant_Name|
+----+-------+----+-------+------+----+------+------------+-------------+---------------+
|   0|      0|   0|      0|     0|   0| 90784|           0|            0|              0|
+----+-------+----+-------+------+----+------+------------+-------------+---------------+



In [0]:
df_restaurant = df_restaurant.withColumn('Rating', when(col("Rating") == '--', 0).otherwise(col("Rating")))

In [0]:
df_restaurant = df_restaurant.withColumn(
    'Rating_count',
    when(
        col('Rating_count').rlike('^\\d+\\+ ratings$'),
        regexp_replace(col('Rating_count'), '\\+ ratings', '').cast('int')
    )
    .when(
        col('Rating_count').rlike('^\\d+K\\+ ratings$'),
        (regexp_replace(col('Rating_count'), 'K\\+ ratings', '') * 1000).cast('int')
    )
    .when(
        col('Rating_count') == 'NA',
        0
    )
    .when(
        col('Rating_count') == 'Too Few Ratings',
        0
    )
    .otherwise(None)
)

  + Menu DF

In [0]:
# df_menu.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_menu.columns]).show()

In [0]:
# df_menu.select([sum(when(col(c)=='NA',1).otherwise(0)).alias(c) for c in df_menu.columns]).show()

In [0]:
# df_menu.select([count(when(col(c) == '--', 1)).alias(c) for c in df_menu.columns]).show()

#### Duplicate Data Handling
  + Restaurant DF

In [0]:
df_restaurant.groupBy('Restaurant_ID').count().filter(col("count") > 1).count()

5935

In [0]:
df_restaurant = df_restaurant.dropDuplicates(['Restaurant_ID'])

  + menu_df

In [0]:
df_menu.groupBy('Restaurant_ID','Category','Item_Name').count().filter(col("count") > 1).count()

559515

In [0]:
df_menu = df_menu.drop_duplicates(['Restaurant_ID','Category','Item_Name'])

#### Removing unnecessary columns

In [0]:
df_restaurant = df_restaurant.drop('City_Link')

#### Defining Schema of DataFrames

In [0]:
df_restaurant.printSchema()

root
 |-- City: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Cost: double (nullable = true)
 |-- Cuisine: string (nullable = true)
 |-- Lic_No: string (nullable = true)
 |-- Link: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rating_count: integer (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Restaurant_Name: string (nullable = true)



In [0]:
new_schema = StructType([
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Cost", IntegerType(), True),              
    StructField("Cuisine", StringType(), True),
    StructField("Lic_No", StringType(), True),
    StructField("Rating", DoubleType(), True),            
    StructField("Rating_count", IntegerType(), True),    
    StructField("Restaurant_ID", IntegerType(), True),    
    StructField("Restaurant_Name", StringType(), True)
])
 
df_restaurant = df_restaurant.select(
    [col(c.name).cast(c.dataType) for c in new_schema.fields]
)

In [0]:
# print(df_restaurant.count())
# len(df_restaurant.columns)

In [0]:
df_menu.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Veg_or_Non_Veg: string (nullable = true)



In [0]:
new_schema = StructType([
    StructField("Category", StringType(), True),
    StructField("Item_Name", StringType(), True),
    StructField("Price", DoubleType(), True),              
    StructField("Restaurant_ID", IntegerType(), True),
    StructField("Veg_or_Non_Veg", StringType(), True)
])

df_menu = df_menu.select(
    [col(field.name).cast(field.dataType).alias(field.name) for field in new_schema.fields]
)

In [0]:
# print(df_menu.count())
# len(df_menu.columns)

#### Joining Rest and Menu

In [0]:
df_menu = df_menu.groupBy('Restaurant_ID').agg(
    collect_list(
        struct('Category', 'Item_Name', 'Price', 'Restaurant_ID', 'Veg_or_Non_Veg')
    ).alias('Menu_Items'))

In [0]:
df_final = df_restaurant.join(df_menu, on="Restaurant_ID", how="left")

# encryption of lic no

In [0]:
# AES encryption function
def encrypt_lic_no(lic_no, key):
    if lic_no is None:
        return None
    try:
        # Ensure the key is exactly 16 bytes (128-bit key)
        key = key.ljust(16)[:16].encode('utf-8')
        cipher = AES.new(key, AES.MODE_ECB)  # AES encryption in ECB mode
       
        # Padding to make the input length a multiple of 16
        padded_lic_no = lic_no.ljust(16 * ((len(lic_no) + 15) // 16))
        encrypted_bytes = cipher.encrypt(padded_lic_no.encode('utf-8'))
       
        # Encode the encrypted bytes to Base64
        encrypted_base64 = base64.b64encode(encrypted_bytes).decode('utf-8')
        return encrypted_base64
    except Exception as e:
        return None
 
# AES decryption function
def decrypt_lic_no(encrypted_base64, key):
    if encrypted_base64 is None:
        return None
    try:
        # Ensure the key is exactly 16 bytes (128-bit key)
        key = key.ljust(16)[:16].encode('utf-8')
       
        # Decode the Base64-encoded encrypted text
        encrypted_bytes = base64.b64decode(encrypted_base64.encode('utf-8'))
       
        # Create the AES cipher object in ECB mode for decryption
        cipher = AES.new(key, AES.MODE_ECB)
       
        # Decrypt the data
        decrypted_bytes = cipher.decrypt(encrypted_bytes)
       
        # Remove padding (the padding was added during encryption to make the length a multiple of 16)
        decrypted_text = decrypted_bytes.decode('utf-8').rstrip(' ')  # Remove padding
       
        return decrypted_text
    except Exception as e:
        return None
 
# Define the encryption key
encryption_key = dbutils.secrets.get(scope="az_cs_scope", key="lic-no")
 
# Create UDF for encryption
encrypt_lic_no_udf = udf(lambda lic_no: encrypt_lic_no(lic_no, encryption_key), StringType())
 
# Encrypt the lic_no column
df_final = df_final.withColumn("Lic_No", encrypt_lic_no_udf(col("Lic_No")))
 
# Create UDF for decryption
decrypt_lic_no_udf = udf(lambda encrypted_text: decrypt_lic_no(encrypted_text, encryption_key), StringType())
 
# Decrypt the lic_no_encrypted column
# df1 = df.withColumn("Lic_No", decrypt_lic_no_udf(col("Lic_No")))

In [0]:
# df_final.display()

In [0]:
df_final = df_final.withColumn("timestamp", current_timestamp())

In [0]:
spark.conf.set("fs.azure.account.key.cs0st0rage.dfs.core.windows.net", ACCOUNT_KEY)

delta_table_path_final = "abfss://silver@cs0st0rage.dfs.core.windows.net/"

df_final.write.format("delta").mode("append").save(delta_table_path_final)