In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, split, to_date, date_format, row_number
from pyspark.sql.window import Window
import os

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Load the config file
with open('/content/drive/MyDrive/project/config.json') as f:
    config = json.load(f)

# Check if the input_csv file exists and is in csv format
if 'input_csv' not in config or not config['input_csv'].endswith('.csv') or not os.path.exists(config['input_csv']):
    print("No CSV file is selected or the file is invalid.")
    exit()

# Load the DataFrame from the CSV file
df = spark.read.csv(config['input_csv'], header=True, inferSchema=True)
df2 = spark.read.csv(config['input2_csv'], header=True, inferSchema=True)

# Convert the DataFrame to a Parquet file
df.write.mode("overwrite").parquet('temp.parquet')
df2.write.mode("overwrite").parquet('temp2.parquet')

# Load the DataFrames from the Parquet files
df = spark.read.parquet('temp.parquet')
df2 = spark.read.parquet('temp2.parquet')

# Apply transformations
for transformation in config['transformations']:
    if transformation['type'] == 'date_format':
        df = df.withColumn(transformation['column'], to_date(col(transformation['column']), transformation['input_format']))
        df = df.withColumn(transformation['column'], date_format(col(transformation['column']), transformation['output_format']))
    elif transformation['type'] == 'concat':
        df = df.withColumn(transformation['output_column'], concat(*[col(c) for c in transformation['columns']], lit(transformation['separator'])))
    elif transformation['type'] == 'split':
        split_col = split(df[transformation['column']], transformation['separator'])
        for i, output_column in enumerate(transformation['output_columns']):
            df = df.withColumn(output_column, split_col.getItem(i))
    elif transformation['type'] == 'drop':
        df = df.drop(transformation['column'])

# Join the 'Industry' column from temp2.parquet with the DataFrame
if 'Industry' in df2.columns and 'sno' in df.columns:
    joined_df = df.join(df2.select('Industry', 'sno'), on='sno', how='inner')

# Save the transformed DataFrame to a new Parquet file
joined_df.write.mode('overwrite').parquet(config['output_parquet'])

# Read the final transformed DataFrame
final_df = spark.read.parquet(config['output_parquet'])

# Create a new DataFrame with aligned 'sno' values
final_df = final_df.withColumn('sno', row_number().over(Window.orderBy("sno")).cast("integer"))

# Display the DataFrame with aligned sno values
final_df.show()


+---+---------------+---------+------+--------------------+--------------------+-------------+--------------------+-----+---+----+--------------------+
|sno|        User Id|Last Name|   Sex|               Email|               Phone|Date of birth|           Job Title|Month|Day|Year|            Industry|
+---+---------------+---------+------+--------------------+--------------------+-------------+--------------------+-----+---+----+--------------------+
|  1|4defE49671cF860|  Shannon|  Male|   tvang@example.net|   574-440-1423x9799|   2020-07-09|    Technical brewer| 2020| 07|  09|              Sports|
|  2|F89B87bCf8f210b|      Lin|  Male| helen14@example.net|001-273-664-2268x...|   1909-06-20|Teacher, adult ed...| 1909| 06|  20|      Legal Services|
|  3|Cad6052BDd5DEaf|    Blake|Female| brent05@example.org|  927-880-5785x85266|   1964-08-19|Armed forces oper...| 1964| 08|  19|         Hospitality|
|  4|e83E46f80f629CD|  Hoffman|Female|munozcraig@exampl...|001-147-429-8340x608|   2009-