In [1]:
!pip install pyspark



## Task 1 (Analysis)

#### Import libraries

In [1]:
from pyspark import SparkContext
from datetime import datetime
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


####  Creating a SparkContext in Apache Spark

In [2]:
sc=SparkContext(master="local")

**Trip Columns**

1. **Departure Date and Time**
2. **Departure Airport**
3. **Departure Airport Code**
4. **Arrival Airport**
5. **Arrival Airport Code**
6. **Arrival Date and Time**
7. **Airline**
8. **Distance (in km)**
9. **Aircraft Model**
10. **Total Seats**
11. **Occupied Seats**


#### Loading a Text File into an RDD in Apache Spark

In [3]:
rdd_data=sc.textFile("/content/drive/My Drive/Colab Notebooks/Spark/trip.txt",3)

In [4]:
rdd_data.count()

1000000

#### Print 2 row from data

In [5]:
rdd_data.take(2)

["2024-05-07 12:16:35,O'Hare International Airport,ORD,Suvarnabhumi Airport,BKK,2024-05-08 03:34:35,Air France,13778Km,Airbus A319,150,140",
 '2024-04-15 07:24:08,Suvarnabhumi Airport,BKK,Sydney Kingsford-Smith Airport,SYD,2024-04-15 15:45:08,Emirates,7522Km,Boeing 737-700,148,146']

#### Transforming an RDD: Splitting Lines into Lists

In [6]:
rdd_1=rdd_data.map(lambda x:x.split(","))

In [7]:
rdd_1.take(2)

[['2024-05-07 12:16:35',
  "O'Hare International Airport",
  'ORD',
  'Suvarnabhumi Airport',
  'BKK',
  '2024-05-08 03:34:35',
  'Air France',
  '13778Km',
  'Airbus A319',
  '150',
  '140'],
 ['2024-04-15 07:24:08',
  'Suvarnabhumi Airport',
  'BKK',
  'Sydney Kingsford-Smith Airport',
  'SYD',
  '2024-04-15 15:45:08',
  'Emirates',
  '7522Km',
  'Boeing 737-700',
  '148',
  '146']]

### Find the number of flights per month that land at "Amsterdam Airport Schiphol" in 2024. Print the results.

In [8]:
rdd_Amsterdam_1=rdd_1.filter(lambda x: x[3]=="Amsterdam Airport Schiphol"and datetime.strptime(x[5], '%Y-%m-%d %H:%M:%S').year == 2024)

In [9]:
rdd_Amsterdam_1.take(3)

[['2024-01-01 22:25:52',
  'John F. Kennedy International Airport',
  'JFK',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-01-02 04:55:52',
  'American Airlines',
  '5854Km',
  'Embraer ERJ170',
  '76',
  '76'],
 ['2024-01-27 19:21:43',
  'Seoul Incheon International Airport',
  'ICN',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-01-28 04:51:43',
  'China Southern Airlines',
  '8561Km',
  'Airbus A321',
  '210',
  '210'],
 ['2024-04-11 13:51:38',
  'Sao Paulo-Guarulhos International Airport',
  'GRU',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-04-12 00:43:38',
  'Emirates',
  '9785Km',
  'Boeing 737-700',
  '148',
  '147']]

In [10]:
rdd_Amsterdam_2=rdd_Amsterdam_1.map(lambda x:(x[3],1))

In [11]:
rdd_Amsterdam_2.reduceByKey(lambda x,y:x+y).collect()

[('Amsterdam Airport Schiphol', 33263)]

 ### Find the flights departing from “Tokyo Haneda Airport” between 09:00 and 18:00. Store the results in a file.

In [12]:
filtered_flights = rdd_1.filter(lambda x: x[1] == "Tokyo Haneda Airport" )

In [13]:
filtered_flights_2 = filtered_flights.filter(lambda x: 9 <= datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S').hour <= 18)


In [14]:
filtered_flights_2.take(3)

[['2024-05-09 13:12:08',
  'Tokyo Haneda Airport',
  'HND',
  'John F. Kennedy International Airport',
  'JFK',
  '2024-05-10 01:17:08',
  'American Airlines',
  '10887Km',
  'Airbus A321',
  '210',
  '200'],
 ['2024-01-11 17:41:07',
  'Tokyo Haneda Airport',
  'HND',
  'Los Angeles International Airport',
  'LAX',
  '2024-01-12 03:29:07',
  'Air France',
  '8822Km',
  'Boeing 737-700',
  '148',
  '142'],
 ['2024-03-25 16:54:25',
  'Tokyo Haneda Airport',
  'HND',
  'Los Angeles International Airport',
  'LAX',
  '2024-03-26 02:42:25',
  'American Airlines',
  '8822Km',
  'Boeing 737-700',
  '148',
  '144']]

In [15]:
filtered_flights_2.saveAsTextFile("/content/drive/My Drive/flights_from_haneda_between_9_to_18.txt")

### Find all “American Airlines” flights that depart from "Seoul Incheon International Airport" and store the results in a file.

#### Filtering an RDD: Selecting Rows for "American Airlines"

In [16]:
American_Airlines_1=rdd_1.filter(lambda x:x[6]=='American Airlines')

In [17]:
American_Airlines_1.take(3)

[['2024-01-28 16:15:19',
  'John F. Kennedy International Airport',
  'JFK',
  'Miami International Airport',
  'MIA',
  '2024-01-28 18:12:19',
  'American Airlines',
  '1759Km',
  'Bombardier CRJ-900',
  '90',
  '80'],
 ['2024-01-01 22:25:52',
  'John F. Kennedy International Airport',
  'JFK',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-01-02 04:55:52',
  'American Airlines',
  '5854Km',
  'Embraer ERJ170',
  '76',
  '76'],
 ['2024-04-05 02:08:19',
  'Dubai International Airport',
  'DXB',
  'Kuala Lumpur International Airport',
  'KUL',
  '2024-04-05 08:18:19',
  'American Airlines',
  '5555Km',
  'AVRO RJ100',
  '112',
  '108']]

#### Filtering an RDD: Selecting Flights from "Seoul Incheon International Airport" for "American Airlines"


In [18]:
American_Airlines_2=American_Airlines_1.filter(lambda x:x[1]=='Seoul Incheon International Airport')

In [19]:
American_Airlines_2.take(3)

[['2024-04-15 11:34:02',
  'Seoul Incheon International Airport',
  'ICN',
  'Denver International Airport',
  'DEN',
  '2024-04-15 22:38:02',
  'American Airlines',
  '9970Km',
  'Bombardier CRJ-900',
  '90',
  '84'],
 ['2024-03-13 17:15:06',
  'Seoul Incheon International Airport',
  'ICN',
  'McCarran International Airport',
  'LAS',
  '2024-03-14 04:00:06',
  'American Airlines',
  '9679Km',
  'AVRO RJ100',
  '112',
  '105'],
 ['2024-01-27 06:54:39',
  'Seoul Incheon International Airport',
  'ICN',
  'Kuala Lumpur International Airport',
  'KUL',
  '2024-01-27 12:01:39',
  'American Airlines',
  '4617Km',
  'Airbus A319',
  '150',
  '146']]

#### Saving the Filtered RDD to a Text File in Google Drive

In [20]:
American_Airlines_2.saveAsTextFile("/content/drive/My Drive/All American Airlines flights that depart from Seoul Incheon International Airport.txt") #save on my google drive

### Find the number of overnight flights per month in 2024. The term "overnight flights" refers to flights that depart between 20:00 and 23:00 and arrive the next day after 03:00. Store the results in a file.

#### Filtering an RDD: Selecting Flights from the Year 2024


In [21]:
overnight_flights_1=rdd_1.filter(lambda x:datetime.strptime(x[5], '%Y-%m-%d %H:%M:%S').year == 2024)

In [22]:
overnight_flights_1.take(3)

[['2024-05-07 12:16:35',
  "O'Hare International Airport",
  'ORD',
  'Suvarnabhumi Airport',
  'BKK',
  '2024-05-08 03:34:35',
  'Air France',
  '13778Km',
  'Airbus A319',
  '150',
  '140'],
 ['2024-04-15 07:24:08',
  'Suvarnabhumi Airport',
  'BKK',
  'Sydney Kingsford-Smith Airport',
  'SYD',
  '2024-04-15 15:45:08',
  'Emirates',
  '7522Km',
  'Boeing 737-700',
  '148',
  '146'],
 ['2024-02-16 02:47:10',
  'Shanghai Pudong International Airport',
  'PVG',
  'San Francisco International Airport',
  'SFO',
  '2024-02-16 13:46:10',
  'Delta Air Lines',
  '9888Km',
  'Embraer ERJ170',
  '76',
  '67']]

#### Filtering an RDD: Selecting Flights Departing Between 8 PM and Midnight (from 20 to 23)


In [23]:
overnight_flights_2=overnight_flights_1.filter(lambda x:20 <= datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S').hour <= 23)

In [24]:
overnight_flights_2.take(3)

[['2024-01-01 22:25:52',
  'John F. Kennedy International Airport',
  'JFK',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-01-02 04:55:52',
  'American Airlines',
  '5854Km',
  'Embraer ERJ170',
  '76',
  '76'],
 ['2024-01-18 21:15:52',
  'Charles de Gaulle Airport',
  'CDG',
  'Miami International Airport',
  'MIA',
  '2024-01-19 05:26:52',
  'Emirates',
  '7379Km',
  'Embraer ERJ195',
  '118',
  '115'],
 ['2024-04-26 21:14:36',
  'Charlotte Douglas International Airport',
  'CLT',
  'Dallas-Fort Worth International Airport',
  'DFW',
  '2024-04-26 22:54:36',
  'China Southern Airlines',
  '1504Km',
  'Airbus A321',
  '210',
  '203']]

#### Filtering an RDD: Selecting Flights After 3 AM on the Next Day


In [25]:
overnight_flights_3=overnight_flights_2.filter(lambda x:3 <datetime.strptime(x[5], '%Y-%m-%d %H:%M:%S').hour and datetime.strptime(x[5], '%Y-%m-%d %H:%M:%S').day+1)

In [26]:
overnight_flights_3.take(3)

[['2024-01-01 22:25:52',
  'John F. Kennedy International Airport',
  'JFK',
  'Amsterdam Airport Schiphol',
  'AMS',
  '2024-01-02 04:55:52',
  'American Airlines',
  '5854Km',
  'Embraer ERJ170',
  '76',
  '76'],
 ['2024-01-18 21:15:52',
  'Charles de Gaulle Airport',
  'CDG',
  'Miami International Airport',
  'MIA',
  '2024-01-19 05:26:52',
  'Emirates',
  '7379Km',
  'Embraer ERJ195',
  '118',
  '115'],
 ['2024-04-26 21:14:36',
  'Charlotte Douglas International Airport',
  'CLT',
  'Dallas-Fort Worth International Airport',
  'DFW',
  '2024-04-26 22:54:36',
  'China Southern Airlines',
  '1504Km',
  'Airbus A321',
  '210',
  '203']]

#### Mapping an RDD: Assigning a Constant Value for Flights in 2024


In [27]:
overnight_flights_4=overnight_flights_3.map(lambda x:('2024',1))

In [28]:
overnight_flights_4.take(3)

[('2024', 1), ('2024', 1), ('2024', 1)]

#### Reducing an RDD: Counting Flights for 2024


In [29]:
overnight_flights_4.reduceByKey(lambda x,y:x+y).collect()

[('2024', 138129)]

#### Saving the RDD to a Text File: Exporting Results to Google Drive


In [30]:
American_Airlines_2.saveAsTextFile("/content/drive/My Drive/overnight flights per month in 2024.txt")

#### The occupancy rate for a flight refers to the percentage of available seats on an aircraft that are filled with passengers. For example, if a plane has 200 seats and there are 180 passengers on board, the occupancy rate would be:
(180 passengers / 200 seats) * 100 = 90% occupancy rateTask 5: Find the occupancy rate for each flight departing from “John F. Kennedy International Airport” between 09:00 and 15:00. Store the results in a file.

#### Filtering RDD: Flights Departing from John F. Kennedy Airport Between 9 AM and 3 PM


In [31]:
occupancy_rate_1=rdd_1.filter(lambda x:x[1]=='John F. Kennedy International Airport').filter(lambda x:9 <= datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S').hour <= 15)

In [32]:
occupancy_rate_1.take(2)

[['2024-01-15 09:40:56',
  'John F. Kennedy International Airport',
  'JFK',
  "O'Hare International Airport",
  'ORD',
  '2024-01-15 10:59:56',
  'Emirates',
  '1189Km',
  'Bombardier CRJ-900',
  '90',
  '86'],
 ['2024-04-01 14:33:23',
  'John F. Kennedy International Airport',
  'JFK',
  'Miami International Airport',
  'MIA',
  '2024-04-01 16:30:23',
  'Air Canada',
  '1759Km',
  'Embraer ERJ195',
  '118',
  '112']]

#### Mapping an RDD: Calculating Occupancy Rate for Flights


In [33]:
occupancy_rate_2=occupancy_rate_1.map(lambda x:(x[1],(int(x[10]) / int(x[9])) * 100 ))

In [34]:
occupancy_rate_2.take(5)

[('John F. Kennedy International Airport', 95.55555555555556),
 ('John F. Kennedy International Airport', 94.91525423728814),
 ('John F. Kennedy International Airport', 98.33333333333333),
 ('John F. Kennedy International Airport', 99.33333333333333),
 ('John F. Kennedy International Airport', 94.91525423728814)]

#### Rounding Occupancy Rate Values: Mapping with mapValues()

In [35]:
occupancy_rate_3=occupancy_rate_2.mapValues(lambda x:round(x,2))

In [36]:
occupancy_rate_3.take(3)

[('John F. Kennedy International Airport', 95.56),
 ('John F. Kennedy International Airport', 94.92),
 ('John F. Kennedy International Airport', 98.33)]

#### Saving the Occupancy Rate RDD to a Text File


In [37]:
occupancy_rate_3.saveAsTextFile("/content/drive/My Drive/occupancy rate.txt")

##### End This Task

## Task 2 (Calculate Total Price)

#### Loading Data into an RDD from a Text File

**Order_items Columns**

1. **ID**
2. **Product Code**
3. **Quantity**
4. **Price**


In [38]:
Rdd_data=sc.textFile('/content/drive/My Drive/Colab Notebooks/Spark/Order_items.txt')

In [39]:
Rdd_data.count()

172198

In [40]:
Rdd_data.take(3)

['1, 957, 1, 299.98', '2, 1073, 1, 199.99', '2, 502, 5, 50.0']

#### Transforming the RDD with the map function

In [41]:
Rdd_1=Rdd_data.map(lambda x:x.split(','))

In [42]:
Rdd_1.take(3)

[['1', ' 957', ' 1', ' 299.98'],
 ['2', ' 1073', ' 1', ' 199.99'],
 ['2', ' 502', ' 5', ' 50.0']]

#### Transforming Data to Calculate Total Price


In [43]:
Rdd_2=Rdd_1.map(lambda x: (x[0], float(x[2])*float(x[3])))

In [44]:
Rdd_2.take(10)

[('1', 299.98),
 ('2', 199.99),
 ('2', 250.0),
 ('2', 129.99),
 ('4', 49.98),
 ('4', 299.95),
 ('4', 150.0),
 ('4', 199.92),
 ('5', 299.98),
 ('5', 299.95)]

#### Aggregated Data by ID

In [45]:
Rdd_3=Rdd_2.reduceByKey(lambda x,y: x+y)

In [46]:
Rdd_3.take(10)

[('1', 299.98),
 ('2', 579.98),
 ('4', 699.85),
 ('5', 1129.8600000000001),
 ('7', 579.9200000000001),
 ('8', 729.8399999999999),
 ('9', 599.96),
 ('10', 651.9200000000001),
 ('11', 919.79),
 ('12', 1299.8700000000001)]

#### Rounding Aggregated Values to Two Decimal Places


In [47]:
Rdd_4=Rdd_3.mapValues(lambda x:round(x,2))

In [48]:
Rdd_4.take(10)

[('1', 299.98),
 ('2', 579.98),
 ('4', 699.85),
 ('5', 1129.86),
 ('7', 579.92),
 ('8', 729.84),
 ('9', 599.96),
 ('10', 651.92),
 ('11', 919.79),
 ('12', 1299.87)]

##### End This Task

## Task 3 (Join)

#### Reading Data from a Text File


**Students Columns**

1. **ID**
2. **First Name**
3. **Last Name**
4. **Group ID**


In [49]:
students=sc.textFile('/content/drive/My Drive/Colab Notebooks/Spark/students.txt')

In [50]:
students.count()

10

In [51]:
students.take(10)

['6000,Muath,Omer,01',
 '6001,Jehad,Shadi,01',
 '6002,Salma,Ahmad,01',
 '6003,Sara,Ali,01',
 '6004,Suha,Jamil,01',
 '6005,Omer,Amer,01',
 '7000,Jamil,Khalid,02',
 '7001,Ali,Salem,02',
 '7002,Ahmad,Ali,02',
 '7003,Rasha,Shadi,02']

**Majors Columns**

1. **Group ID**
2. **Department**


#### Reading Data from a Text File


In [52]:
majors=sc.textFile('/content/drive/My Drive/Colab Notebooks/Spark/majors.txt')

In [53]:
majors.count()

3

In [54]:
majors.take(3)

['01,Computer Science', '02,Data Science', '03,CIS']

#### Splitting Data in Each Line and Store in List


In [55]:
SRdd=students.map(lambda x:x.split(","))

In [56]:
SRdd.take(10)

[['6000', 'Muath', 'Omer', '01'],
 ['6001', 'Jehad', 'Shadi', '01'],
 ['6002', 'Salma', 'Ahmad', '01'],
 ['6003', 'Sara', 'Ali', '01'],
 ['6004', 'Suha', 'Jamil', '01'],
 ['6005', 'Omer', 'Amer', '01'],
 ['7000', 'Jamil', 'Khalid', '02'],
 ['7001', 'Ali', 'Salem', '02'],
 ['7002', 'Ahmad', 'Ali', '02'],
 ['7003', 'Rasha', 'Shadi', '02']]

#### Mapping Data to Key-Value Pairs ( Student )

In [57]:
Srdd2=SRdd.map(lambda x:(x[3] , x[1]+' '+x[2]))

In [58]:
Srdd2.take(10)

[('01', 'Muath Omer'),
 ('01', 'Jehad Shadi'),
 ('01', 'Salma Ahmad'),
 ('01', 'Sara Ali'),
 ('01', 'Suha Jamil'),
 ('01', 'Omer Amer'),
 ('02', 'Jamil Khalid'),
 ('02', 'Ali Salem'),
 ('02', 'Ahmad Ali'),
 ('02', 'Rasha Shadi')]

#### Mapping Data to Key-Value Pairs ( Majors )

In [59]:
Mrdd=majors.map(lambda x: (x.split(",")[0],x.split(",")[1]))

In [60]:
majors.take(2)

['01,Computer Science', '02,Data Science']

In [61]:
Mrdd.take(10)

[('01', 'Computer Science'), ('02', 'Data Science'), ('03', 'CIS')]

#### Joining Two RDDs Based on Keys


In [62]:
RRDD=Srdd2.join(Mrdd)

In [63]:
RRDD.take(3)

[('01', ('Muath Omer', 'Computer Science')),
 ('01', ('Jehad Shadi', 'Computer Science')),
 ('01', ('Salma Ahmad', 'Computer Science'))]

#### Extracting Values from a Joined RDD


In [64]:
RRDD.map(lambda x:x[1]).collect()

[('Muath Omer', 'Computer Science'),
 ('Jehad Shadi', 'Computer Science'),
 ('Salma Ahmad', 'Computer Science'),
 ('Sara Ali', 'Computer Science'),
 ('Suha Jamil', 'Computer Science'),
 ('Omer Amer', 'Computer Science'),
 ('Jamil Khalid', 'Data Science'),
 ('Ali Salem', 'Data Science'),
 ('Ahmad Ali', 'Data Science'),
 ('Rasha Shadi', 'Data Science')]

##### End this Task