In [1]:
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import datetime
import sys

# Author: Group1 
# Created: 10th May 2023
# Last Modified: 10th May 2023

# Description: This pyspark application is converting json data into csv format
# Reference - Project SRS doc

if __name__ == "__main__":
    # defining SparkSession object
    spark = SparkSession.builder.master("yarn").appName("Json-to-CSV").getOrCreate()
    sc=spark.sparkContext
    #Definning Logger
    log4jLogger = sc._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger("jsontocsv")
    appID = spark.sparkContext.applicationId

    PROJECT_PATH = "/home/talentum/zomato_etl"
    try:
        # Creating a DataFrame
        logger.info(f'-----appID is : {appID}')

        # Set the input and output directories
        logger.warn("Input and Output Directory Set")
        input_dir = PROJECT_PATH+"/source/json"
        output_dir = "file://"+PROJECT_PATH+"/source/csv"

        # listing all the json files in the given directory
        json_files = [file for file in os.listdir(input_dir) if file.endswith('.json')]
        
        # removing existing csv files
        rm_cmd=PROJECT_PATH+"/source/csv/*"
        os.system(f"rm -rf {rm_cmd}")

        logger.info(f'-----json file to loaded to spark from path: {input_dir}')

        # iterating through all json files in the list and selecting only required columns
        for i, file in enumerate(sorted(json_files)):
            logger.warn(f"{file} Read")
            df=spark.read.json("file://"+input_dir+"/"+file)
            res_df=df.select(F.explode(df.restaurants.restaurant).alias("res"))
            logger.info('-----Exploded column restaurants.restaurant')
            # selecting required columns
            data=res_df.select(
                "res.R.res_id",
                    "res.name",
                    "res.location.country_id",
                    "res.location.city",
                    "res.location.address",
                    "res.location.locality",
                    "res.location.locality_verbose",
                    "res.location.longitude",
                    "res.location.latitude",
                    "res.cuisines",
                    "res.average_cost_for_two",
                    "res.currency",
                    "res.has_table_booking",
                    "res.has_online_delivery",
                    "res.is_delivering_now",
                    "res.switch_to_order_menu",
                    "res.price_range",
                    "res.user_rating.aggregate_rating",
                    "res.user_rating.rating_text",
                    "res.user_rating.votes"
                )
            logger.info('-----Selected required columns')
            #defining header
            headers = ['Restaurant ID', 'Restaurant Name', 'Country Code', 'City', 'Address', 'Locality', 'Locality Verbose', 'Longitude',
           'Latitude', 'Cuisines', 'Average Cost for two', 'Currency', 'Has Table booking', 'Has Online delivery', 'Is delivering now',
           'Switch to order menu', 'Price range', 'Aggregate rating', 'Rating text', 'Votes']
            # adding header to the df
            data = data.toDF(*headers)
            logger.info('-----Header added to DataFrame')
            logger.info('---------------------------Printing SCHEMA----------------------------')
            logger.info(res_df.printSchema())
            data.write.mode("append").option("delimiter","\t").csv(f"{output_dir}/zomato_csv")
            logger.warn(f"{file} Wrote")
        spark.stop()


    except  IOError as e:
        print("**********************************************************************")
        print("\033[91mHad an IOException trying to read that file\033[0m")
        print("**********************************************************************")


    except FileNotFoundError as e:
        print("**********************************************************************")
        print("\033[91mCaught File Not Found Exception!\033[0m")
        print("**********************************************************************")


    except IndexError as e:
        print("**********************************************************************")
        print("\033[91mCaught Array Index Out Of Bounds Exception, Kindly Input Parameters On Invoking This Script\033[0m")
        print("**********************************************************************")

    except Exception as e:
        print("**********************************************************************")
        print("\033[91mCaught an Error, Kindly Refer Logs. Failed Status Updated!\033[0m")
        print("**********************************************************************")
