In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("BigDataAnalysis") \
    .master("local[1]") \
    .config("spark.driver.memory", "256m") \
    .config("spark.executor.memory", "256m") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

In [None]:
import random
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema
schema = StructType([
    StructField("RecordNumber", IntegerType(), True),
    StructField("Country", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Zipcode", StringType(), True),
    StructField("State", StringType(), True)
])

# Sample data from simple-zipcodes.csv
cities = ["PARC PARQUE", "PASEO COSTA DEL SUR", "BDA SAN LUIS", "HOLT", "HOMOSASSA", 
          "CINGULAR WIRELESS", "FORT WORTH", "FT WORTH", "SPRUCE PINE", "ASH HILL", 
          "URB EUGENE RICE", "MESA", "HILLIARD", "HOLDER", "SECT LANAUSSE", 
          "SPRING GARDEN", "SPRINGVILLE", "ASHEBORO"]
states = ["PR", "FL", "TX", "AL", "NC", "AZ"]
zipcodes = ["704", "709", "32564", "34487", "76166", "76177", "35585", "27007", 
           "85209", "85210", "32046", "34445", "35146", "27203", "27204"]

# Generate 100,000 rows
num_rows = 100000
data = [
    (i, "US", random.choice(cities), random.choice(zipcodes), random.choice(states))
    for i in range(1, num_rows + 1)
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show(5, truncate=False)
df.printSchema()

In [None]:
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")

In [None]:
from pyspark.sql.functions import count, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")

In [None]:
from pyspark.sql.functions import count, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
from pyspark.sql.functions import count
zip_by_state = df.groupBy("State").agg(count("Zipcode").alias("zip_count")).orderBy("zip_count", ascending=False)
zip_by_state.show()

In [None]:
top_cities = df.groupBy("City").count().orderBy("count", ascending=False).limit(10)
top_cities.show()

In [None]:
from pyspark.sql.functions import avg
avg_record_by_state = df.groupBy("State").agg(avg("RecordNumber").alias("avg_record")).orderBy("State")
avg_record_by_state.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Install matplotlib and seaborn if needed
!pip install matplotlib seaborn

# Zip Codes by State
zip_by_state_pd = zip_by_state.toPandas()
plt.figure(figsize=(10, 6))
sns.barplot(data=zip_by_state_pd, x="State", y="zip_count")
plt.xlabel("State")
plt.ylabel("Number of Zip Codes")
plt.title("Distribution of Zip Codes by State")
plt.show()

# Top 10 Cities
top_cities_pd = top_cities.toPandas()
plt.figure(figsize=(10, 6))
sns.barplot(data=top_cities_pd, x="count", y="City")
plt.xlabel("Number of Records")
plt.ylabel("City")
plt.title("Top 10 Cities by Frequency")
plt.show()

In [None]:
spark.stop()