In [2]:
"""The program reads in a JSON file containing gym data using the PySpark library. The JSON file contains nested structures, so the program uses the explode function to flatten the data and select the relevant columns. The program then creates a unique ID for each person based on their name and sorts the data based on this ID.

Next, the program filters the data based on age eligibility (18 years and older) and creates two DataFrames: one for eligible people and one for ineligible people. Finally, the program writes the two DataFrames to CSV files.

Overall, the program is an example of how to use PySpark to read in, manipulate, and analyze data stored in nested JSON structures.
"""
# Import necessary libraries
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import explode, dense_rank, when
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder.appName("Gym Data").getOrCreate()
"""
schema = StructType([
    StructField("Gyms", ArrayType(
        StructType([
            StructField("id", LongType(), True),
            StructField("Gym_name", StringType(), True),
            StructField("people", ArrayType(
                StructType([
                    StructField("Person_name", StringType(), True),
                    StructField("age", LongType(), True),
                    StructField("addresses", ArrayType(
                        StructType([
                            StructField("type", StringType(), True),
                            StructField("Street", StringType(), True),
                            StructField("city", ArrayType(
                                StructType([
                                StructField("City_name", StringType(), True),
                                StructField("state", LongType(), True),
                                StructField("country", LongType(), True),
                                ])
                        ), True)
                        ])
                    ), True)
                ])
            ), True)
        ])
    ), True)
])
json_data_df = spark.read.option('multiline', 'True').schema(schema).json('/Users/krishnaveni/Desktop/gym.json')
"""
# Read in the JSON data
json_data_df = spark.read.option('multiline', 'True').json('/Users/krishnaveni/Desktop/n.json')

# Explode the nested structures and select relevant columns
json_data_df = json_data_df.select(explode("gyms").alias("gym"))
json_data_df = json_data_df.select("gym.id", "gym.Gym_name", explode("Gym.people").alias("person"))
json_data_df = json_data_df.select("id", "Gym_name", "person.Person_name", "person.age", explode("person.addresses").alias("address"))
json_data_df = json_data_df.select("id", "Gym_name", "Person_name", "age", "address.type", "address.street", "address.city.City_name", "address.city.state", "address.city.country")

# Create a window partitioned by Gym_name and ordered by Person_name
window = Window.partitionBy('Gym_name').orderBy('Person_name')

# Add a unique ID column based on the window and assign to a new DataFrame
df = json_data_df.withColumn('unique_id', dense_rank().over(window))

# Print the DataFrame
print("Json file Data")
df.show()

# Select distinct names and ages based on the unique ID column and sort by it
distinct_names = df.select('unique_id','Person_name','age').distinct()
Ordered_df = distinct_names.orderBy(distinct_names['unique_id'].asc())

# Create two DataFrames based on age eligibility
result_df = Ordered_df.withColumn('is_eligible', when(Ordered_df['age'] > 18, 'Y').otherwise('N'))
eligible_df = result_df.filter(result_df.is_eligible == 'Y')
not_eligible_df = result_df.filter(result_df.is_eligible == 'N')

# Write the DataFrames to CSV files
eligible_df.write.csv("/Users/krishnaveni/Desktop/eligible1.csv", header=True)
not_eligible_df.write.csv("/Users/krishnaveni/Desktop/noteligible1.csv", header=True)


Json file Data
+-----+-------------+-----------+---+------+-------------+----------+-----+-------+---------+
|   id|     Gym_name|Person_name|age|  type|       street| City_name|state|country|unique_id|
+-----+-------------+-----------+---+------+-------------+----------+-----+-------+---------+
|12345|Fitness World| Jane Smith| 25| home1| 111 Pine St.|   Anytown|   CA|    USA|        1|
|12345|Fitness World| Jane Smith| 25| home2|222 Maple St.|Otherville|   CA|    USA|        1|
|12345|Fitness World| Jane Smith| 25|office|  333 Oak St.| Cityville|   CA|    USA|        1|
|12345|Fitness World| John singh| 30| home1| 123 Main St.|   Anytown|   CA|    USA|        2|
|12345|Fitness World| John singh| 30| home2|  456 Elm St.|Otherville|   CA|    USA|        2|
|12345|Fitness World| John singh| 30|office|  789 Oak St.| Cityville|   CA|    USA|        2|
|12345|Fitness World|Kiddo singh| 11| home1| 111 Pine St.|   Anytown|   CA|    USA|        3|
|12345|Fitness World|Kiddo singh| 11| home2|2