In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=edfa8e4fe37af8938e2cede9870319b644a90bfc7be204f7b2c941e8701081b3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [7]:
dataDF = spark.read.option("header", "true").option("inferSchema", "true").csv("worldcities.csv")

In [8]:
dataDF.show(5)

+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|   city|city_ascii|    lat|     lng|    country|iso2|iso3| admin_name|capital|population|        id|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|  Tokyo|     Tokyo|35.6897|139.6922|      Japan|  JP| JPN|      Tōkyō|primary|  3.7977E7|1392685764|
|Jakarta|   Jakarta|-6.2146|106.8451|  Indonesia|  ID| IDN|    Jakarta|primary|   3.454E7|1360771077|
|  Delhi|     Delhi|  28.66|   77.23|      India|  IN| IND|      Delhi|  admin|  2.9617E7|1356872604|
| Mumbai|    Mumbai|18.9667| 72.8333|      India|  IN| IND|Mahārāshtra|  admin|  2.3355E7|1356226629|
| Manila|    Manila|   14.6|120.9833|Philippines|  PH| PHL|     Manila|primary|  2.3088E7|1608618140|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
only showing top 5 rows



In [9]:
dataDF.count()

41001

In [10]:
dataDF.write.mode('overwrite').partitionBy('country')\
            .option('compression', 'snappy')\
            .option('partitionOverwriteMode', 'dynamic')\
            .save('worldcities')

In [11]:
dataDFParquet = spark.read.parquet('worldcities')

In [12]:
dataDF.count()

41001

# Homework

In [13]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [14]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("city", StringType(), True),
    StructField("city_ascii", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("country", StringType(), True),
    StructField("iso2", StringType(), True),
    StructField("iso3", StringType(), True),
    StructField("admin_name", StringType(), True),
    StructField("capital", StringType(), True),
    StructField("population", IntegerType(), True),
    StructField("id", IntegerType(), True)
])

In [15]:
# Read the CSV file into a DataFrame
dataDF = spark.read.option("header", "true")\
                   .schema(schema)\
                   .csv("worldcities.csv")

In [17]:
# Show the first 5 row of the DataFrame
dataDF.show(5)

+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|   city|city_ascii|    lat|     lng|    country|iso2|iso3| admin_name|capital|population|        id|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|  Tokyo|     Tokyo|35.6897|139.6922|      Japan|  JP| JPN|      Tōkyō|primary|  37977000|1392685764|
|Jakarta|   Jakarta|-6.2146|106.8451|  Indonesia|  ID| IDN|    Jakarta|primary|  34540000|1360771077|
|  Delhi|     Delhi|  28.66|   77.23|      India|  IN| IND|      Delhi|  admin|  29617000|1356872604|
| Mumbai|    Mumbai|18.9667| 72.8333|      India|  IN| IND|Mahārāshtra|  admin|  23355000|1356226629|
| Manila|    Manila|   14.6|120.9833|Philippines|  PH| PHL|     Manila|primary|  23088000|1608618140|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
only showing top 5 rows



In [18]:
# Count the number of rows in the DataFrame
print("Total number of rows:", dataDF.count())

Total number of rows: 41001


In [19]:
# Calculate the total population for each country and order by population in descending order
results = dataDF.groupBy("country").sum("population").orderBy("sum(population)", ascending=False)

In [20]:
# Show the top 5 countries by total population
results.show(5)

+-------------+---------------+
|      country|sum(population)|
+-------------+---------------+
|        China|     1446111841|
|United States|      401453709|
|        India|      270309635|
|       Brazil|      198554881|
|        Japan|      138634635|
+-------------+---------------+
only showing top 5 rows



In [21]:
# Save the results to a CSV file
results.write.mode('overwrite').option("header", "true").csv("country_population.csv")