In [4]:
# Initialize the spark on jupyter notebook

import findspark
findspark.init()


# To create the dataframe in pyspark 

from pyspark.sql import SparkSession

In [5]:
# Initializing the sparksesstion 

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [7]:
# Reading the csv file in pyspark 

df = spark.read.csv('country_data.csv')

In [8]:
#Show First ten records
df.show(10)

+---+--------------+---+-------------+-------+
|_c0|           _c1|_c2|          _c3|    _c4|
+---+--------------+---+-------------+-------+
|  1|         Kabul|AFG|        Kabol|1780000|
|  2|      Qandahar|AFG|     Qandahar| 237500|
|  3|         Herat|AFG|        Herat| 186800|
|  4|Mazar-e-Sharif|AFG|        Balkh| 127800|
|  5|     Amsterdam|NLD|Noord-Holland| 731200|
|  6|     Rotterdam|NLD| Zuid-Holland| 593321|
|  7|          Haag|NLD| Zuid-Holland| 440900|
|  8|       Utrecht|NLD|      Utrecht| 234323|
|  9|     Eindhoven|NLD|Noord-Brabant| 201843|
| 10|       Tilburg|NLD|Noord-Brabant| 193238|
+---+--------------+---+-------------+-------+
only showing top 10 rows



In [9]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [30]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

schema = StructType([StructField('_c0', IntegerType(), True), 
                      StructField('City', StringType(), True), 
                      StructField('Country_Code', StringType(), True),
                      StructField('Province', StringType(), True),
                      StructField('Population', IntegerType(), True)
                     ])

In [31]:
# Redefining the dataframe with correct column labels 

df = spark.read.csv('country_data.csv', header=True, schema=schema)
df.show(5)

+---+--------------+------------+-------------+----------+
|_c0|          City|Country_Code|     Province|Population|
+---+--------------+------------+-------------+----------+
|  2|      Qandahar|         AFG|     Qandahar|    237500|
|  3|         Herat|         AFG|        Herat|    186800|
|  4|Mazar-e-Sharif|         AFG|        Balkh|    127800|
|  5|     Amsterdam|         NLD|Noord-Holland|    731200|
|  6|     Rotterdam|         NLD| Zuid-Holland|    593321|
+---+--------------+------------+-------------+----------+
only showing top 5 rows



In [32]:
# To count the total rows in the dataframe.
print('Number of rows: \t', df.count())

# Number of columns can be counted in this way
print('Number of columns: \t', len(df.columns))

# To view the names of the columns
print('\nName of columns: \n', df.columns)

Number of rows: 	 4078
Number of columns: 	 5

Name of columns: 
 ['_c0', 'City', 'Country_Code', 'Province', 'Population']


In [33]:
# Droping unwanted row from datafrmae
df.drop('_c0').show(5)

+--------------+------------+-------------+----------+
|          City|Country_Code|     Province|Population|
+--------------+------------+-------------+----------+
|      Qandahar|         AFG|     Qandahar|    237500|
|         Herat|         AFG|        Herat|    186800|
|Mazar-e-Sharif|         AFG|        Balkh|    127800|
|     Amsterdam|         NLD|Noord-Holland|    731200|
|     Rotterdam|         NLD| Zuid-Holland|    593321|
+--------------+------------+-------------+----------+
only showing top 5 rows



In [34]:
# Selecting two columns from the dataframe 
df.select('City', 'Population').show(5)

+--------------+----------+
|          City|Population|
+--------------+----------+
|      Qandahar|    237500|
|         Herat|    186800|
|Mazar-e-Sharif|    127800|
|     Amsterdam|    731200|
|     Rotterdam|    593321|
+--------------+----------+
only showing top 5 rows



In [35]:
# Convert spark dataframe to Pandas dataframe
df.toPandas().head()

Unnamed: 0,_c0,City,Country_Code,Province,Population
0,2,Qandahar,AFG,Qandahar,237500
1,3,Herat,AFG,Herat,186800
2,4,Mazar-e-Sharif,AFG,Balkh,127800
3,5,Amsterdam,NLD,Noord-Holland,731200
4,6,Rotterdam,NLD,Zuid-Holland,593321


In [36]:
# By default the data is orderd by country, change to Population
df.orderBy('Population', ascending=False).show(5)

+----+---------------+------------+------------+----------+
| _c0|           City|Country_Code|    Province|Population|
+----+---------------+------------+------------+----------+
|1024|Mumbai (Bombay)|         IND| Maharashtra|  10500000|
|2331|          Seoul|         KOR|       Seoul|   9981619|
| 206|      São Paulo|         BRA|   São Paulo|   9968485|
|1890|       Shanghai|         CHN|    Shanghai|   9696300|
| 939|        Jakarta|         IDN|Jakarta Raya|   9604900|
+----+---------------+------------+------------+----------+
only showing top 5 rows



We see the Highest Population City is Mumbai(India)

In [37]:
# Descriptive statistics of the datafrmae
df.describe('Population').show()

+-------+-----------------+
|summary|       Population|
+-------+-----------------+
|  count|             4078|
|   mean|350117.6763119176|
| stddev|723518.0762669161|
|    min|               42|
|    max|         10500000|
+-------+-----------------+



The average population in the dataset is 350117

In [46]:
# distinct() removes the repeatition in the selection 
cities = df.select('City', 'Population').distinct()
cities.show(10)

+--------------------+----------+
|                City|Population|
+--------------------+----------+
|         Belize City|     55810|
|             Atibaia|    100356|
|          Birmingham|   1013000|
|            Brighton|    156124|
|           Koudougou|    105000|
|Midnapore (Medini...|    125498|
|Tellicherry (Thal...|    103579|
|            Hachioji|    513451|
|                Gifu|    408007|
|            Ichihara|    279280|
+--------------------+----------+
only showing top 10 rows



In [47]:
# We can sort the dataframe by selected column name 
cities.sort('City').show(10)

+--------------------+----------+
|                City|Population|
+--------------------+----------+
|A Coruña (La Coruña)|    243402|
|              Aachen|    243825|
|             Aalborg|    161161|
|                 Aba|    298900|
|              Abadan|    206073|
|          Abaetetuba|    111258|
|              Abakan|    169200|
|          Abbotsford|    105403|
|            Abeokuta|    427400|
|            Aberdeen|    213070|
+--------------------+----------+
only showing top 10 rows



In [48]:
# Now we can count the total number of countries 
cities.count()

4078

In [50]:
# Filter is to make selection from the dataframe

cities.filter((cities['Population'] > 1000000)).show()

+----------------+----------+
|            City|Population|
+----------------+----------+
|      Birmingham|   1013000|
|        Shanghai|   9696300|
|        Nanchang|   1691600|
|          Quezon|   2173831|
|          Peking|   7472000|
|         Tianjin|   5286800|
|            Cali|   2077386|
|  Nezahualcóyotl|   1224924|
|Harkova [Harkiv]|   1500000|
|            Pune|   1566651|
|       Hyderabad|   1151274|
|        Curitiba|   1584232|
|          Lusaka|   1317000|
|Ho Chi Minh City|   3980000|
|            Giza|   2221868|
|            Perm|   1009700|
|        Ludhiana|   1042740|
|          Beirut|   1100000|
|   St Petersburg|   4694000|
|          Manaus|   1255049|
+----------------+----------+
only showing top 20 rows



In [51]:
# Missing values in the dataframe 
for col_name in df.columns:
    print(col_name, '\t\t\t', df.filter(df[col_name].isNull()).count())

_c0 			 0
City 			 0
Country_Code 			 0
Province 			 4
Population 			 0


We have four missing values in the Province column

In [53]:
# Filling the missing values 
df_nonull = df.fillna({'Province': 'Not Available'})

for col_name in df_nonull.columns:
    print(col_name, '\t\t\t', df_nonull.filter(df_nonull[col_name].isNull()).count())

_c0 			 0
City 			 0
Country_Code 			 0
Province 			 0
Population 			 0
