In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.functions import max
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('Spark Training').getOrCreate()
sc = spark.sparkContext

In [23]:
df = spark.read.format('csv').options(delimiter=',', header=True , inferSchema=True).load('/content/sample_data/population.csv')

In [30]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,IntegerType

In [32]:
schema = StructType([
    StructField(name="Country Name", dataType=StringType(), nullable=True),
    StructField(name="Country Code", dataType=StringType(), nullable=True),
    StructField(name="Year", dataType=IntegerType(), nullable=True),
    StructField(name="Value", dataType=IntegerType(), nullable=True)
])

In [33]:
df.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Value: double (nullable = true)



In [35]:
avg_value = df.groupBy("Country Name").agg(avg("Value").alias("average_value"))
avg_value.show()

+--------------------+-------------------+
|        Country Name|      average_value|
+--------------------+-------------------+
|          South Asia|  1.2133345478125E9|
|                Chad|       8040221.1875|
| Lower middle income| 1.83420376521875E9|
|            Paraguay|     4215777.890625|
| Low & middle income|4.258080837546875E9|
|Heavily indebted ...|     4.3425936975E8|
|               World|  5.4587022159375E9|
|    Congo, Dem. Rep.|  4.5710188265625E7|
|             Senegal|     8870310.578125|
|          Cabo Verde|        391508.9375|
|              Sweden|     8753814.609375|
|East Asia & Pacif...|   1.567344636625E9|
|            Kiribati|        81998.15625|
|Least developed c...|     5.9735165275E8|
|              Guyana|        741505.5625|
|             Eritrea|     2083260.234375|
|         Philippines|  6.7997814796875E7|
|Pacific island sm...|      1792425.40625|
|            Djibouti|        573017.6875|
|               Tonga|       96705.234375|
+----------

In [37]:
result = df.groupBy("Country Name").agg(
    max("Value").alias("max_value")
)
result.show()

+--------------------+-------------+
|        Country Name|    max_value|
+--------------------+-------------+
|          South Asia|1.951539835E9|
|                Chad|  1.9319064E7|
| Lower middle income| 3.07977878E9|
|            Paraguay|    6844146.0|
| Low & middle income|6.633109634E9|
|Heavily indebted ...| 9.17304254E8|
|               World|8.061876001E9|
|    Congo, Dem. Rep.| 1.05789731E8|
|             Senegal|  1.8077573E7|
|          Cabo Verde|     522331.0|
|              Sweden|  1.0536632E7|
|East Asia & Pacif...|2.111139152E9|
|            Kiribati|     132530.0|
|Least developed c...|1.161055545E9|
|              Guyana|     826353.0|
|             Eritrea|    3470390.0|
|         Philippines| 1.14891199E8|
|Pacific island sm...|    2689224.0|
|            Djibouti|    1152944.0|
|               Tonga|     107570.0|
+--------------------+-------------+
only showing top 20 rows



In [38]:
asc_sorted = df.orderBy("Value")
asc_sorted.show()

+--------------------+------------+----+------+
|        Country Name|Country Code|Year| Value|
+--------------------+------------+----+------+
|Sint Maarten (Dut...|         SXM|1960|2715.0|
|Sint Maarten (Dut...|         SXM|1961|2970.0|
|Sint Maarten (Dut...|         SXM|1962|3264.0|
|Sint Maarten (Dut...|         SXM|1963|3584.0|
|Sint Maarten (Dut...|         SXM|1964|3922.0|
|St. Martin (Frenc...|         MAF|1960|4250.0|
|Sint Maarten (Dut...|         SXM|1965|4282.0|
|St. Martin (Frenc...|         MAF|1961|4386.0|
|St. Martin (Frenc...|         MAF|1962|4527.0|
|               Nauru|         NRU|1960|4607.0|
|Sint Maarten (Dut...|         SXM|1966|4664.0|
|St. Martin (Frenc...|         MAF|1963|4673.0|
|               Nauru|         NRU|1961|4774.0|
|St. Martin (Frenc...|         MAF|1964|4827.0|
|               Nauru|         NRU|1962|4979.0|
|St. Martin (Frenc...|         MAF|1965|4996.0|
|Sint Maarten (Dut...|         SXM|1967|5071.0|
|St. Martin (Frenc...|         MAF|1966|

In [39]:
desc_sorted = df.orderBy("Value", ascending=False)
desc_sorted.show()

+----------------+------------+----+--------------+
|    Country Name|Country Code|Year|         Value|
+----------------+------------+----+--------------+
|           World|         WLD|2023| 8.061876001E9|
|           World|         WLD|2022|7.9899815195E9|
|           World|         WLD|2021| 7.921184346E9|
|           World|         WLD|2020|7.8561387895E9|
|           World|         WLD|2019| 7.776892015E9|
|           World|         WLD|2018|7.6964948475E9|
|           World|         WLD|2017|7.6141135515E9|
|           World|         WLD|2016|7.5285233335E9|
|           World|         WLD|2015|7.4414717205E9|
|           World|         WLD|2014|7.3539109255E9|
|           World|         WLD|2013| 7.265314967E9|
|           World|         WLD|2012| 7.175816385E9|
|           World|         WLD|2011|7.0862016635E9|
|           World|         WLD|2010| 7.000671233E9|
|           World|         WLD|2009| 6.916129027E9|
|IDA & IBRD total|         IBT|2023| 6.858957145E9|
|           

In [40]:
df1 = df.select('Country Name', 'Country Code').distinct()
df1.count()

265

In [43]:
df2 = df.select(col('Country Code').alias('ctry_cd'), 'Value', 'Year').distinct()
df2.count()

16930

In [51]:
df1.join(df2, col('Country Code') == col('ctry_cd')).show()


+-----------------+------------+-------+------------+----+
|     Country Name|Country Code|ctry_cd|       Value|Year|
+-----------------+------------+-------+------------+----+
|      Afghanistan|         AFG|    AFG| 1.6250794E7|1994|
|   American Samoa|         ASM|    ASM|     27995.0|1972|
|          Austria|         AUT|    AUT|   7270889.0|1965|
|          Burundi|         BDI|    BDI|   4681390.0|1982|
|          Belgium|         BEL|    BEL| 1.0478617E7|2005|
|            Benin|         BEN|    BEN|   8953969.0|2007|
|     Burkina Faso|         BFA|    BFA| 1.6176498E7|2010|
|         Bulgaria|         BGR|    BGR|   8312068.0|1997|
|         Bulgaria|         BGR|    BGR|   6446596.0|2023|
|          Belarus|         BLR|    BLR|   9979610.0|2000|
|           Belize|         BLZ|    BLZ|    126445.0|1973|
|           Brazil|         BRA|    BRA|1.95284734E8|2011|
|Brunei Darussalam|         BRN|    BRN|    142189.0|1972|
|  Channel Islands|         CHI|    CHI|    155309.0|200

In [52]:
df1.join(df2, col('Country Code') == col('ctry_cd')).drop(col('ctry_cd')).show(5,False)

+--------------+------------+-----------+----+
|Country Name  |Country Code|Value      |Year|
+--------------+------------+-----------+----+
|Afghanistan   |AFG         |1.6250794E7|1994|
|American Samoa|ASM         |27995.0    |1972|
|Austria       |AUT         |7270889.0  |1965|
|Burundi       |BDI         |4681390.0  |1982|
|Belgium       |BEL         |1.0478617E7|2005|
+--------------+------------+-----------+----+
only showing top 5 rows

