# Big data distribution- Json, Pyspark aggregate functions

Name - Ojas Patil

Reg No - 21BAI1106

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=b2f41ae28597546c21137cd5552d6c3f51736a939e88ca835bd167d8694ee85c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Create your own JSON file containing 20 records based on the syntax. Using Pyspark, create a spark session, load the Json file and define its schema, and perform the following operations on it.

1. Json to data frame
2. Use Json_normalise function
3. To_json
4. From_json
5. json_tuple
6. get_json_object
7. Schema_of_json

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [3]:
spark = SparkSession.builder.appName("ReadJson").getOrCreate()

In [4]:
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

In [6]:
json_file_path = "/content/people.json"
df = spark.read.json(json_file_path, schema=schema)
df.show()

+------+----+----------+
|  name| age|      city|
+------+----+----------+
|  NULL|NULL|      NULL|
|   Dev|  35|   Chennai|
|  Hari|  32|      Pune|
|Bhavya|  30|     Delhi|
| Sanya|  21| Ahmedabad|
|Ishaan|  29|    Jaipur|
| Qamar|  34|    Kanpur|
|  Neha|  36|    Indore|
|  Esha|  22| Bangalore|
|  Jaya|  38|   Lucknow|
| Gauri|  26| Ahmedabad|
|Farhan|  40| Hyderabad|
| Omkar|  23|    Jaipur|
|Preeti|  31|     Surat|
| Leela|  33|    Bhopal|
|   Raj|  37|    Nagpur|
|Manish|  27|     Patna|
| Kabir|  24|Chandigarh|
|Tushar|  39|   Lucknow|
| Aarav|  25|    Mumbai|
+------+----+----------+
only showing top 20 rows



In [7]:
import pandas as pd

pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,name,age,city
0,,,
1,Dev,35.0,Chennai
2,Hari,32.0,Pune
3,Bhavya,30.0,Delhi
4,Sanya,21.0,Ahmedabad


In [8]:
normalized_df = pd.json_normalize(pandas_df["name"])

normalized_df["age"] = pandas_df["age"]
normalized_df["city"] = pandas_df["city"]

normalized_df.head()

Unnamed: 0,age,city
0,,
1,35.0,Chennai
2,32.0,Pune
3,30.0,Delhi
4,21.0,Ahmedabad


In [9]:
from pyspark.sql.functions import to_json

json_string_column = to_json("Hello this is testing")
print(json_string_column)

Column<'to_json(Hello this is testing)'>


In [13]:
from pyspark.sql.functions import from_json

parsed_df = df.withColumn("new_name", from_json("name", schema))
print(parsed_df)

DataFrame[name: string, age: int, city: string, new_name: struct<name:string,age:int,city:string>]


In [44]:
from pyspark.sql.functions import json_tuple

data = [
    ("Dev", '''{"age": 35, "city": "Chennai"}'''),
    ("Hari", '''{"age": 32, "city": "Pune"}''')
]

df = spark.createDataFrame(data, ("key", "jstring"))
df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()

[Row(key='Dev', c0=None, c1=None), Row(key='Hari', c0=None, c1=None)]

In [45]:
from pyspark.sql.functions import get_json_object, col

data = [
    ("Dev", '''{"age": 35, "city": "Chennai"}'''),
    ("Hari", '''{"age": 32, "city": "Pune"}''')
]

df = spark.createDataFrame(data, ["key", "json_string"])

df = df.withColumn("f1", get_json_object(col("json_string"), "$.f1"))
df = df.withColumn("f2", get_json_object(col("json_string"), "$.f2"))

df.show()

+----+--------------------+----+----+
| key|         json_string|  f1|  f2|
+----+--------------------+----+----+
| Dev|{"age": 35, "city...|NULL|NULL|
|Hari|{"age": 32, "city...|NULL|NULL|
+----+--------------------+----+----+



In [48]:
data = [
    ("1", '''{"name": "Dev", "age": 35, "city": "Chennai"}'''),
    ("2", '''{"name": "Hari", "age": 32, "city": "Pune"}'''),
    ("3", '''{"name": "Alice", "age": 28, "city": "Bangalore"}'''),
    ("4", '''{"name": "John", "age": 30, "city": "Mumbai"}'''),
    ("5", '''{"name": "Emma", "age": 27, "city": "Delhi"}''')
]

df = spark.createDataFrame(data, ["key", "json_data"])

json_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))

df = df.withColumn("name", col("parsed_data.name"))
df = df.withColumn("age", col("parsed_data.age"))
df = df.withColumn("city", col("parsed_data.city"))

df.show()

+---+--------------------+--------------------+-----+---+---------+
|key|           json_data|         parsed_data| name|age|     city|
+---+--------------------+--------------------+-----+---+---------+
|  1|{"name": "Dev", "...|  {Dev, 35, Chennai}|  Dev| 35|  Chennai|
|  2|{"name": "Hari", ...|    {Hari, 32, Pune}| Hari| 32|     Pune|
|  3|{"name": "Alice",...|{Alice, 28, Banga...|Alice| 28|Bangalore|
|  4|{"name": "John", ...|  {John, 30, Mumbai}| John| 30|   Mumbai|
|  5|{"name": "Emma", ...|   {Emma, 27, Delhi}| Emma| 27|    Delhi|
+---+--------------------+--------------------+-----+---+---------+



## Using Pyspark, creast a session and schema for the "zipcode.csv", dataset. Using the dataset given, perform the following operations.

1. Select, select first and last
2. Sum, average and collect_set
3. Count and distinct count
4. Kurtosis and skewness
5. Min, Max, standard deviation and variance

In [49]:
df = spark.read.csv('/content/zipcodes.csv',header=True,inferSchema=True)
df.show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL|         NULL|
|           2|    70

### Q1. Select, select first and last

In [50]:
selected_columns = df.select("RecordNumber", "City", "State", "Country")
selected_columns.show()


+------------+-------------------+-----+-------+
|RecordNumber|               City|State|Country|
+------------+-------------------+-----+-------+
|           1|        PARC PARQUE|   PR|     US|
|           2|PASEO COSTA DEL SUR|   PR|     US|
|          10|       BDA SAN LUIS|   PR|     US|
|       61391|  CINGULAR WIRELESS|   TX|     US|
|       61392|         FORT WORTH|   TX|     US|
|       61393|           FT WORTH|   TX|     US|
|           4|    URB EUGENE RICE|   PR|     US|
|       39827|               MESA|   AZ|     US|
|       39828|               MESA|   AZ|     US|
|       49345|           HILLIARD|   FL|     US|
|       49346|             HOLDER|   FL|     US|
|       49347|               HOLT|   FL|     US|
|       49348|          HOMOSASSA|   FL|     US|
|          10|       BDA SAN LUIS|   PR|     US|
|           3|      SECT LANAUSSE|   PR|     US|
|       54354|      SPRING GARDEN|   AL|     US|
|       54355|        SPRINGVILLE|   AL|     US|
|       54356|      

In [51]:
first_row = df.first()
print("First Row:")
print(first_row)

First Row:
Row(RecordNumber=1, Zipcode=704, ZipCodeType='STANDARD', City='PARC PARQUE', State='PR', LocationType='NOT ACCEPTABLE', Lat=17.96, Long=-66.22, Xaxis=0.38, Yaxis=-0.87, Zaxis=0.3, WorldRegion='NA', Country='US', LocationText='Parc Parque, PR', Location='NA-US-PR-PARC PARQUE', Decommisioned=False, TaxReturnsFiled=None, EstimatedPopulation=None, TotalWages=None, Notes=None)


In [52]:
last_row = df.collect()[-1]
print("Last Row:")
print(last_row)

Last Row:
Row(RecordNumber=76513, Zipcode=27204, ZipCodeType='PO BOX', City='ASHEBORO', State='NC', LocationType='PRIMARY', Lat=35.71, Long=-79.81, Xaxis=0.14, Yaxis=-0.79, Zaxis=0.58, WorldRegion='NA', Country='US', LocationText='Asheboro, NC', Location='NA-US-NC-ASHEBORO', Decommisioned=False, TaxReturnsFiled=1035, EstimatedPopulation=1816, TotalWages=30322473, Notes=None)


## Q2. Sum, average and collect_set

In [56]:
from pyspark.sql.functions import sum, avg, collect_set

In [59]:
sum_total_wages = df.select(sum("TotalWages")).collect()[0][0]
print("Sum of TotalWages:", sum_total_wages)

Sum of TotalWages: 1914421629


In [60]:
avg_estimated_population = df.select(avg("EstimatedPopulation")).collect()[0][0]
print("Average of EstimatedPopulation:", avg_estimated_population)

Average of EstimatedPopulation: 8893.818181818182


In [61]:
collected_city_state = df.select(collect_set("City").alias("Cities"), collect_set("State").alias("States")).collect()[0]
print("Collected Cities:", collected_city_state["Cities"])
print("Collected States:", collected_city_state["States"])

Collected Cities: ['URB EUGENE RICE', 'BDA SAN LUIS', 'SPRUCE PINE', 'HOMOSASSA', 'FORT WORTH', 'SPRINGVILLE', 'PASEO COSTA DEL SUR', 'FT WORTH', 'HILLIARD', 'SPRING GARDEN', 'CINGULAR WIRELESS', 'HOLDER', 'MESA', 'ASH HILL', 'PARC PARQUE', 'SECT LANAUSSE', 'HOLT', 'ASHEBORO']
Collected States: ['AL', 'TX', 'NC', 'AZ', 'PR', 'FL']


### Q3. Count and distinct count


In [63]:
from pyspark.sql.functions import count, countDistinct

In [64]:
total_rows = df.count()
print("Total Rows:", total_rows)

Total Rows: 21


In [65]:
distinct_zipcodes = df.select(countDistinct("Zipcode").alias("DistinctZipcodes")).collect()[0]["DistinctZipcodes"]
print("Distinct Zipcodes:", distinct_zipcodes)

Distinct Zipcodes: 17


### Q4. Kurtosis and skewness

In [71]:
from pyspark.sql.functions import kurtosis, skewness

In [72]:
kurtosis_result = df.select(kurtosis("Lat").alias("Kurtosis_Lat")).collect()[0]
print("Kurtosis for the 'Lat' column:", kurtosis_result["Kurtosis_Lat"])

Kurtosis for the 'Lat' column: -1.2097757752091982


In [73]:
skewness_result = df.select(skewness("Lat").alias("Skewness_Lat")).collect()[0]
print("Skewness for the 'Lat' column:", skewness_result["Skewness_Lat"])

Skewness for the 'Lat' column: -0.7239329176078787


### Q5. Min, Max, standard deviation and variance


In [76]:
from pyspark.sql.functions import min, max, stddev, variance

In [77]:
min_max_result = df.select(min("TaxReturnsFiled").alias("Min_TaxReturnsFiled"), max("TaxReturnsFiled").alias("Max_TaxReturnsFiled")).collect()[0]
stdvar_result = df.select(stddev("TaxReturnsFiled").alias("StdDev_TaxReturnsFiled"), variance("TaxReturnsFiled").alias("Variance_TaxReturnsFiled")).collect()[0]

In [78]:
min_max_result = df.select(min("TaxReturnsFiled").alias("Min_TaxReturnsFiled"), max("TaxReturnsFiled").alias("Max_TaxReturnsFiled")).collect()[0]
stdvar_result = df.select(stddev("TaxReturnsFiled").alias("StdDev_TaxReturnsFiled"), variance("TaxReturnsFiled").alias("Variance_TaxReturnsFiled")).collect()[0]

print("Minimum value for 'TaxReturnsFiled' column:", min_max_result["Min_TaxReturnsFiled"])
print("Maximum value for 'TaxReturnsFiled' column:", min_max_result["Max_TaxReturnsFiled"])
print("Standard Deviation for 'TaxReturnsFiled' column:", stdvar_result["StdDev_TaxReturnsFiled"])
print("Variance for 'TaxReturnsFiled' column:", stdvar_result["Variance_TaxReturnsFiled"])

Minimum value for 'TaxReturnsFiled' column: 610
Maximum value for 'TaxReturnsFiled' column: 14962
Standard Deviation for 'TaxReturnsFiled' column: 5324.035345828986
Variance for 'TaxReturnsFiled' column: 28345352.363636363
