In [4]:
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

In [6]:
flightData2015 = spark\
    .read   \
    .option("inferSchema", "true")  \
    .option("header", "true")   \
    .csv("./data/2015-summary.csv")

In [7]:
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [8]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [9]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=45]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/user/Desktop/FATEC - BD Negócios/Projetos/Python/Spark/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [10]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [11]:
%%time

flightData2015.sort("count").take(2)

CPU times: total: 0 ns
Wall time: 401 ms


[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [12]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [13]:
spark.sql("""

        SELECT DEST_COUNTRY_NAME
        FROM flight_data_2015
        GROUP BY DEST_COUNTRY_NAME

    """).groupBy()  \
        .count()    \
.show()

+-----+
|count|
+-----+
|  132|
+-----+



In [14]:
spark.sql("SELECT max(count) from flight_data_2015").show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [16]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [19]:
maxSql = spark.sql("""
    
    SELECT 
        DEST_COUNTRY_NAME, sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
                   
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [22]:
maxSql = spark.sql("""
    
    SELECT 
        DEST_COUNTRY_NAME, sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
                   
""")
maxSql.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#185L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#185L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=444]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/user/Desktop/FATEC - BD Negócios/Projetos/Python/Spark/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [20]:
from pyspark.sql.functions import desc

flightData2015  \
    .groupBy("DEST_COUNTRY_NAME")   \
    .sum("count")   \
    .withColumnRenamed("sum(count)", "destination_total")   \
    .sort(desc("destination_total"))    \
    .limit(5)   \
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

