###This code is executed on Google Colab due to resource constraints. The necessary artifacts for this code are stored in a Git

In [None]:
#install necessary library using pip
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=dbff77dc45fffb35734c08d53ca00e68feff3a553b3f7b08af9366654fa2d68a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

###Part 1: Spark RDD API 


###Task 1: Solution

In [None]:
import csv
import requests
from tempfile import NamedTemporaryFile
from pyspark.sql import SparkSession

# Download the CSV file from GitHub
url = 'https://raw.githubusercontent.com/stedy/Machine-Learning-with-R-datasets/master/groceries.csv'
response = requests.get(url)

# Write the response text to a temporary file
with NamedTemporaryFile(mode='w', delete=False) as f:
    f.write(response.text)

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv(f.name)

# Print the first 10 records
df.show(10)


+----------------+-------------------+--------------+--------------------+
|             _c0|                _c1|           _c2|                 _c3|
+----------------+-------------------+--------------+--------------------+
|    citrus fruit|semi-finished bread|     margarine|         ready soups|
|  tropical fruit|             yogurt|        coffee|                null|
|      whole milk|               null|          null|                null|
|       pip fruit|             yogurt| cream cheese |        meat spreads|
|other vegetables|         whole milk|condensed milk|long life bakery ...|
|      whole milk|             butter|        yogurt|                rice|
|      rolls/buns|               null|          null|                null|
|other vegetables|           UHT-milk|    rolls/buns|        bottled beer|
|      pot plants|               null|          null|                null|
|      whole milk|            cereals|          null|                null|
+----------------+-------

###Task 2a: Solution

In [None]:
# Convert the DataFrame to an RDD
rdd = df.rdd

# Extract the products from each transaction and remove duplicates
products = rdd.flatMap(lambda x: x[0].split(',')).distinct()

# Write the products to a text file
with open('out/out_1_2a.txt', 'w') as f:
    for product in products.collect():
        f.write(product + '\n')

###Task 2b: Solution

In [None]:
# Extract the products from each transaction and remove duplicates
products = rdd.flatMap(lambda x: x[0].split(',')).distinct()

# Count the number of products
count = products.count()

# Write the count to a text file
with open('out/out_1_2b.txt', 'w') as f:
    f.write('Count:\n')
    f.write(str(count))

###Task 3 Solution

In [None]:
# Count the number of occurrences of each product
product_counts = rdd.flatMap(lambda x: x[0].split(',')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

# Sort the products by their frequency in descending order
sorted_counts = product_counts.sortBy(lambda x: -x[1])

# Take the top 5 products
top_5 = sorted_counts.take(5)

# Write the top 5 products and their frequencies to a text file
with open('out/out_1_3.txt', 'w') as f:
    for product, count in top_5:
        f.write("('{}', {})\n".format(product, count))

##Part 2: Spark Dataframe API 

###Task 1 Solution

In [None]:
import pyspark
from pyspark.sql import SparkSession

# Create a SparkSession
spark = pyspark.sql.SparkSession.builder.getOrCreate()
# filename ='part-00000-tid-4320459746949313749-5c3d407c-c844-4016-97ad-2edec446aa62-6688-1-c000.snappy.parquet'
# Read the parquet file into a DataFrame
df = spark.read.parquet('/content/src/part-00000-tid-4320459746949313749-5c3d407c-c844-4016-97ad-2edec446aa62-6688-1-c000.snappy.parquet')

# Print the schema of the DataFrame
df.printSchema()

# Show the first 10 rows of the DataFrame
df.show(10)


root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

###Task 2 Solution


In [None]:
import pyspark.sql.functions as F

# Compute the minimum price, maximum price, and total row count
min_price = df.select(F.min('price')).first()[0]
max_price = df.select(F.max('price')).first()[0]
row_count = df.select(F.count('*')).first()[0]

# Create a DataFrame with the minimum price, maximum price, and total row count
output_df = spark.createDataFrame([(min_price, max_price, row_count)], ['min_price', 'max_price', 'row_count'])

# Write the DataFrame to a CSV file
output_df.write.csv('out/out_2_2.txt', header=True)

###Task 3 Solution

In [None]:
import pyspark.sql.functions as F

# Filter the DataFrame to only include properties with a price > 5000 and a review score of 10
filtered_df = df.where((df.price > 5000) & (df.review_scores_rating == 10))

# Compute the average number of bathrooms and bedrooms
avg_bathrooms = filtered_df.select(F.avg('bathrooms')).first()[0]
avg_bedrooms = filtered_df.select(F.avg('bedrooms')).first()[0]

# Set the averages to 0 if they are None
if avg_bathrooms is None:
    avg_bathrooms = 0
if avg_bedrooms is None:
    avg_bedrooms = 0

# Create a DataFrame with the average number of bathrooms and bedrooms
output_df = spark.createDataFrame([(avg_bathrooms, avg_bedrooms)], ['avg_bathrooms', 'avg_bedrooms'])

# Write the DataFrame to a CSV file
output_df.write.csv('out/out_2_3.txt', header=True)

###Task 4 Solution

In [None]:
import pyspark.sql.functions as F

# Find the property with the lowest price
lowest_price_df = df.where(df.price == df.select(F.min('price')).first()[0])

# Find the property with the highest rating
highest_rating_df = df.where(df.review_scores_rating == df.select(F.max('review_scores_rating')).first()[0])

# Find the intersection of the two DataFrames
intersection_df = lowest_price_df.intersect(highest_rating_df)

# Get the number of people that can be accommodated by the property
people_accommodated = intersection_df.first()['accommodates']

# Write the number of people that can be accommodated to a text file
with open('out/out_2_4.txt', 'w') as f:
    f.write(str(people_accommodated))


###Task 5 Solution

In [None]:
%%writefile src/task_2_5.py

# Import the required libraries
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# Set default_args dictionary to specify default parameters of the DAG, such as the start date and frequency of runs
default_args = {
    'owner': 'me',
    'start_date': datetime(2023, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Create a DAG instance and pass it the default_args dictionary
dag = DAG(
    'task_2_5',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

# Create DummyOperator instances for each task
task1 = DummyOperator(task_id='task_1', dag=dag)
task2 = DummyOperator(task_id='task_2', dag=dag)
task3 = DummyOperator(task_id='task_3', dag=dag)
task4 = DummyOperator(task_id='task_4', dag=dag)
task5 = DummyOperator(task_id='task_5', dag=dag)
task6 = DummyOperator(task_id='task_6', dag=dag)

# Set task dependencies
task1 >> [task2, task3] >> [task4, task5, task6]


Writing src/task_2_5.py


### Part 3: Applied Machine Learning


###Task 1 Solution

###Machine Learning Code

In [None]:
!curl -L "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" -o /tmp/iris.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  4551  100  4551    0     0  39573      0 --:--:-- --:--:-- --:--:-- 39573


In [None]:
import numpy as np 
import pandas as pd 
from sklearn.linear_model import LogisticRegression 
df = pd.read_csv("/tmp/iris.csv", 
names = ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"]) 
df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [None]:
# Separate features from class. 
array = df.values 
X = array[:,0:4] 
y = array[:,4] 
# Fit Logistic Regression classifier. 
logreg = LogisticRegression(C=1e5) 
logreg.fit(X, y)

LogisticRegression(C=100000.0)

In [None]:
# Predict on training data. Seems to work. 
# 5.1 3.5 1.4 0.2 Iris-setosa 
# 6.2 3.4 5.4 2.3 Iris-virginica 
print(logreg.predict([[5.1, 3.5, 1.4, 0.2]])) 
print(logreg.predict([[6.2, 3.4, 5.4, 2.3]]))

['Iris-setosa']
['Iris-virginica']


### PySpark Code

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Classifier").getOrCreate()
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

# Define the schema for the DataFrame
schema = StructType([
    StructField("sepal_length", DoubleType(), True),
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True),
    StructField("class", StringType(), True)
])

# Load the data with the specified schema
df = spark.read.csv("/tmp/iris.csv", header=False, schema=schema)
df.show()



+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      class|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression

# Convert the string column "class" to a numerical column
stringIndexer = StringIndexer(inputCol="class", outputCol="classIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

# Create a feature vector by combining all of the columns
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")

# Transform the DataFrame to a features DataFrame
features = assembler.transform(indexed)

# Split the data into training and testing sets
training, test = features.randomSplit([0.7, 0.3])

# Create the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="classIndex", maxIter=10)

# Fit the model to the training data
lrModel = lr.fit(training)


In [None]:
from pyspark.ml.linalg import Vectors
# Create a dictionary to map numerical predictions to class names
class_mapping = {0: "Iris-setosa", 1: "Iris-versicolor", 2: "Iris-virginica"}
cl1_prediction = lrModel.predict(Vectors.dense([5.1, 3.5, 1.4, 0.2]))
cl2_prediction = lrModel.predict(Vectors.dense([6.2, 3.4, 5.4, 2.3]))

# Convert the numerical prediction to a class name
pred_cl1 = class_mapping[cl1_prediction]
pred_cl2 = class_mapping[cl2_prediction]
print(pred_cl1)
print(pred_cl2)

Iris-setosa
Iris-virginica


###Task 2 Solution

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression

# Convert the string column "class" to a numerical column
stringIndexer = StringIndexer(inputCol="class", outputCol="classIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

# Create a feature vector by combining all of the columns
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")

# Transform the DataFrame to a features DataFrame
features = assembler.transform(indexed)

# Split the data into training and testing sets
training, test = features.randomSplit([0.7, 0.3])

# Create the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="classIndex", maxIter=10)

# Fit the model to the training data
lrModel = lr.fit(training)

# Create a dataframe for the predictions
pred_data = spark.createDataFrame( 
 [(5.1, 3.5, 1.4, 0.2),  
 (6.2, 3.4, 5.4, 2.3)], 
 ["sepal_length", "sepal_width", "petal_length", "petal_width"]) 

# Apply the VectorAssembler to pred_data to create the "features" column
pred_features = assembler.transform(pred_data)

# Use the transform method of the lrModel object to generate predictions for pred_features
predictions = lrModel.transform(pred_features)

# Show the predictions
predictions.show()


+------------+-----------+------------+-----------+-----------------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|         features|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+-----------------+--------------------+--------------------+----------+
|         5.1|        3.5|         1.4|        0.2|[5.1,3.5,1.4,0.2]|[17.1574759755653...|[0.99989880750343...|       0.0|
|         6.2|        3.4|         5.4|        2.3|[6.2,3.4,5.4,2.3]|[-12.742254221498...|[1.58323522338701...|       2.0|
+------------+-----------+------------+-----------+-----------------+--------------------+--------------------+----------+



In [None]:
# Map the predictions to class names
predictions = predictions.rdd.map(lambda x: (x["prediction"], class_mapping[x["prediction"]])).toDF(["prediction", "class"])

# Write the predictions to a CSV file
predictions.write.csv("out/out_3_2.txt", header=True, mode="overwrite")
