# Import and initialize

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

[Define number of partitions for create RDD by 'textFile' function](https://github.com/apache/spark/blob/branch-3.1/core/src/main/scala/org/apache/spark/SparkContext.scala#L2501)

_min(defaultParallelism, 2)_

In [57]:
sc.defaultParallelism

12

In [3]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [4]:
data_directory = "D:/BigData/Assignments/Exercise5/Data"

Define number of partitions for create DataFrame by 'spark.read.csv' function

_max(defaultParallelism, file_size / 4 MB)_

In [67]:
# 34,308 KB
test_df_1 = spark.read.csv(data_directory + "/JEOPARDY_CSV.csv")
test_df_1.rdd.getNumPartitions()

9

In [63]:
# 17,875 KB
test_df_2 = spark.read.csv(data_directory + "/mnist_test.csv")
test_df_2.rdd.getNumPartitions()

5

In [64]:
# 6,458 KB
test_df_3 = spark.read.csv(data_directory + "/Test.csv")
test_df_3.rdd.getNumPartitions()

2

In [66]:
# 51,489 KB
test_df_4 = spark.read.csv(data_directory + "/Train.csv")
test_df_4.rdd.getNumPartitions()

12

# Word count

## Create RDD

In [41]:
wordcount_rdd = sc.textFile(data_directory + "/bible+shakes.nopunc").repartition(12)

In [6]:
wordcount_rdd.count()

156215

In [7]:
wordcount_rdd.first()

'holy bible authorized king james version textfile 890904'

In [42]:
wordcount_rdd.getNumPartitions()

12

## Map reduce

In [44]:
wordcount_rdd.flatMap(lambda line: line.strip().replace('  ', ' ').split(' ')) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda value_1, value_2: value_1 + value_2) \
             .sortBy(lambda pair: pair[1], ascending=False) \
             .collect()[:10]

[('the', 93739),
 ('and', 79182),
 ('of', 53121),
 ('to', 33929),
 ('i', 30240),
 ('that', 24407),
 ('in', 24350),
 ('a', 23504),
 ('my', 17312),
 ('he', 17087)]

# Kmer count

## Create RDD

In [9]:
kmercount_rdd = sc.textFile(data_directory + "/ecoli.fa")

In [10]:
kmercount_rdd.count()

66283

In [46]:
kmercount_rdd.getNumPartitions()

2

## Map reduce

In [12]:
def split_into_sequences(line, kmer_length=9):
    line_length = len(line)
    sequences_list = list()
    for i in range(max(0, line_length - kmer_length + 1)):
        sequences_list.append(line[i:i + kmer_length])
    return sequences_list

In [13]:
kmercount_rdd.flatMap(lambda line: split_into_sequences(line)) \
             .map(lambda sequence: (sequence, 1)) \
             .reduceByKey(lambda value_1, value_2: value_1 + value_2) \
             .sortBy(lambda pair: pair[1], ascending=False) \
             .collect()[:10]

[('CCAGCGCCA', 258),
 ('CAGCGCCAG', 252),
 ('GCGCTGGCG', 238),
 ('CGCTGGCGG', 224),
 ('CTGGCGCTG', 221),
 ('CGCCAGCAG', 221),
 ('CGCCAGCGC', 214),
 ('GCCAGCGCC', 213),
 ('TGGCGCTGG', 204),
 ('CCGCCAGCG', 200)]

# Flight data

## Schema

In [14]:
airports_schema = T.StructType([
    T.StructField('Airport ID', T.IntegerType(), False),
    T.StructField('Name', T.StringType(), True),
    T.StructField('City', T.StringType(), True),
    T.StructField('Country', T.StringType(), True),
    T.StructField('IATA', T.StringType(), True),
    T.StructField('ICAO', T.StringType(), True),
    T.StructField('Latitude', T.FloatType(), True),
    T.StructField('Longitude', T.FloatType(), True),
    T.StructField('Altitude', T.FloatType(), True),
    T.StructField('Timezone', T.FloatType(), True),
    T.StructField('DST', T.StringType(), True),
    T.StructField('Tz database time zone', T.StringType(), True),
    T.StructField('Type', T.StringType(), True),
    T.StructField('Source', T.StringType(), True),
])

In [15]:
airports_schema.fields

[StructField(Airport ID,IntegerType,false),
 StructField(Name,StringType,true),
 StructField(City,StringType,true),
 StructField(Country,StringType,true),
 StructField(IATA,StringType,true),
 StructField(ICAO,StringType,true),
 StructField(Latitude,FloatType,true),
 StructField(Longitude,FloatType,true),
 StructField(Altitude,FloatType,true),
 StructField(Timezone,FloatType,true),
 StructField(DST,StringType,true),
 StructField(Tz database time zone,StringType,true),
 StructField(Type,StringType,true),
 StructField(Source,StringType,true)]

In [16]:
routes_schema = T.StructType([
    T.StructField('Airline', T.StringType(), True),
    T.StructField('Airline ID', T.IntegerType(), False),
    T.StructField('Source airport', T.StringType(), True),
    T.StructField('Source airport ID', T.IntegerType(), False),
    T.StructField('Destination airport', T.StringType(), True),
    T.StructField('Destination airport ID', T.IntegerType(), False),
    T.StructField('Codeshare', T.StringType(), True),
    T.StructField('Stops', T.IntegerType(), True),
    T.StructField('Equipment', T.StringType(), True)
])

In [17]:
routes_schema.fields

[StructField(Airline,StringType,true),
 StructField(Airline ID,IntegerType,false),
 StructField(Source airport,StringType,true),
 StructField(Source airport ID,IntegerType,false),
 StructField(Destination airport,StringType,true),
 StructField(Destination airport ID,IntegerType,false),
 StructField(Codeshare,StringType,true),
 StructField(Stops,IntegerType,true),
 StructField(Equipment,StringType,true)]

## Create DataFrame

In [18]:
airports_df = spark.read.csv(data_directory + "/airports.dat", schema=airports_schema)

In [19]:
airports_df.show()

+----------+--------------------+--------------+----------------+----+----+---------+----------+--------+--------+---+---------------------+-------+-----------+
|Airport ID|                Name|          City|         Country|IATA|ICAO| Latitude| Longitude|Altitude|Timezone|DST|Tz database time zone|   Type|     Source|
+----------+--------------------+--------------+----------------+----+----+---------+----------+--------+--------+---+---------------------+-------+-----------+
|         1|      Goroka Airport|        Goroka|Papua New Guinea| GKA|AYGA| -6.08169|   145.392|  5282.0|    10.0|  U| Pacific/Port_Moresby|airport|OurAirports|
|         2|      Madang Airport|        Madang|Papua New Guinea| MAG|AYMD| -5.20708|   145.789|    20.0|    10.0|  U| Pacific/Port_Moresby|airport|OurAirports|
|         3|Mount Hagen Kagam...|   Mount Hagen|Papua New Guinea| HGU|AYMH| -5.82679|   144.296|  5388.0|    10.0|  U| Pacific/Port_Moresby|airport|OurAirports|
|         4|      Nadzab Airport| 

In [20]:
routes_df = spark.read.csv(data_directory + "/routes.dat", schema=routes_schema)

In [21]:
routes_df.show()

+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|Airline|Airline ID|Source airport|Source airport ID|Destination airport|Destination airport ID|Codeshare|Stops|Equipment|
+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|     2B|       410|           AER|             2965|                KZN|                  2990|     null|    0|      CR2|
|     2B|       410|           ASF|             2966|                KZN|                  2990|     null|    0|      CR2|
|     2B|       410|           ASF|             2966|                MRV|                  2962|     null|    0|      CR2|
|     2B|       410|           CEK|             2968|                KZN|                  2990|     null|    0|      CR2|
|     2B|       410|           CEK|             2968|                OVB|                  4078|     null|    0|      CR2|
|     2B|       

## Count the number of airports

In [22]:
airports_df.count()

7698

## Filter airports by country

In [23]:
conutries_list = [row['Country'] for row in airports_df.select('Country').distinct().collect()]
print('Number of countries: ', len(conutries_list))
conutries_list[:5]

Number of countries:  237


['Chad', 'Paraguay', 'Anguilla', 'Russia', 'British Indian Ocean Territory']

In [24]:
airports_df.filter(F.col('Country') == 'Paraguay').show()

+----------+--------------------+--------------------+--------+----+----+----------+----------+--------+--------+---+---------------------+-------+-----------+
|Airport ID|                Name|                City| Country|IATA|ICAO|  Latitude| Longitude|Altitude|Timezone|DST|Tz database time zone|   Type|     Source|
+----------+--------------------+--------------------+--------+----+----+----------+----------+--------+--------+---+---------------------+-------+-----------+
|      2699|Silvio Pettirossi...|            Asuncion|Paraguay| ASU|SGAS|    -25.24|    -57.52|   292.0|    -4.0|  S|     America/Asuncion|airport|OurAirports|
|      2700|Juan De Ayolas Ai...|              Ayolas|Paraguay| AYO|SGAY|-27.370554|-56.854065|   223.0|    -4.0|  S|     America/Asuncion|airport|OurAirports|
|      2701|Teniente Col Carm...|          Conception|Paraguay| CIO|SGCO|-23.442364|-57.427254|   253.0|    -4.0|  S|     America/Asuncion|airport|OurAirports|
|      2702|      Itaipú Airport|       

## Group airports by country and count

In [25]:
airports_groupby_country_df = airports_df.groupBy('Country').count()

In [26]:
airports_groupby_country_df.count()

237

In [27]:
airports_groupby_country_df.show()

+--------------------+-----+
|             Country|count|
+--------------------+-----+
|                Chad|    6|
|            Paraguay|    9|
|            Anguilla|    1|
|              Russia|  264|
|British Indian Oc...|    1|
|               Yemen|   11|
|             Senegal|   11|
|              Sweden|   77|
|            Kiribati|   18|
|              Guyana|   13|
|              Jersey|    1|
|         Philippines|   71|
|               Burma|   43|
|             Eritrea|    3|
|               Tonga|    6|
|      Norfolk Island|    1|
|            Djibouti|    3|
|            Malaysia|   40|
|           Singapore|    6|
|                Fiji|   19|
+--------------------+-----+
only showing top 20 rows



## Join two dataframes

In [28]:
routes_join_destination_airport_df = routes_df.join(
    airports_df,
    routes_df['Destination airport ID'] == airports_df['Airport ID'],
    how='inner'
)

In [29]:
routes_join_destination_airport_df.select(
    'Airline',
    'Airline ID',
    'Source airport',
    'Source airport ID',
    'Destination airport',
    'Destination airport ID',
    'Country'
).show()

+-------+----------+--------------+-----------------+-------------------+----------------------+-------+
|Airline|Airline ID|Source airport|Source airport ID|Destination airport|Destination airport ID|Country|
+-------+----------+--------------+-----------------+-------------------+----------------------+-------+
|     2B|       410|           AER|             2965|                KZN|                  2990| Russia|
|     2B|       410|           ASF|             2966|                KZN|                  2990| Russia|
|     2B|       410|           ASF|             2966|                MRV|                  2962| Russia|
|     2B|       410|           CEK|             2968|                KZN|                  2990| Russia|
|     2B|       410|           CEK|             2968|                OVB|                  4078| Russia|
|     2B|       410|           DME|             4029|                KZN|                  2990| Russia|
|     2B|       410|           DME|             4029|  

## Count the number of flights arriving in each country

In [30]:
count_routes_by_destination_country_df = routes_join_destination_airport_df.groupBy('Country').count()

In [31]:
count_routes_by_destination_country_df.count()

224

In [32]:
count_routes_by_destination_country_df.show()

+--------------+-----+
|       Country|count|
+--------------+-----+
|          Chad|   10|
|        Russia| 1833|
|      Paraguay|   22|
|      Anguilla|    6|
|         Yemen|   90|
|       Senegal|   69|
|        Sweden|  476|
|      Kiribati|    7|
|        Guyana|    9|
|   Philippines|  394|
|         Burma|  104|
|        Jersey|   38|
|       Eritrea|    8|
|      Djibouti|   17|
|         Tonga|    5|
|Norfolk Island|    2|
|     Singapore|  412|
|      Malaysia|  565|
|          Fiji|   65|
|        Turkey|  986|
+--------------+-----+
only showing top 20 rows

