In [2]:
from pyspark.sql import *
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from pyspark import *
from pyspark.sql.functions import *
import pyspark.sql.functions as F


In [3]:
vehicles = StructType([
    StructField("vehicleId", StringType()),
    StructField("efficiency", DoubleType())
])


In [4]:
def init_spark_session() -> SparkSession:
    # TODO: add your code here
    spark = SparkSession.builder.master('local').appName('Faulty Vehicle Detection').getOrCreate()
    return spark


In [5]:
spark=init_spark_session()



In [6]:
def read_csv(input_path: str) -> DataFrame:
    # TODO: add your code here
    data = SparkSession.builder.getOrCreate().read.format("csv").option("header", "True").schema(vehicles).load(
        input_path)
    return data

In [7]:
data_observed=read_csv("spark_mock_text/data/vehicles_observed_efficiency.csv")

In [8]:
data_observed.describe()

DataFrame[summary: string, vehicleId: string, efficiency: string]

In [9]:
data_observed.show()

+---------+----------+
|vehicleId|efficiency|
+---------+----------+
|       v1|      23.0|
|       v1|      24.0|
|       v1|      26.0|
|       v2|      27.0|
|       v2|      28.0|
|       v2|      31.0|
|       v2|      32.0|
|       v3|      36.0|
|       v3|      34.0|
|       v3|      35.0|
|       v4|      38.0|
|       v4|      39.0|
|       v4|      41.0|
|       v4|      43.0|
|       v5|      46.0|
|       v5|      44.0|
|       v5|      47.0|
+---------+----------+



In [10]:
def calc_average_efficiency(observed: DataFrame) -> DataFrame:
    # TODO: add your code here
    data = observed.groupBy(observed.vehicleId).agg({"efficiency": "avg"}).withColumnRenamed('avg(efficiency)',
                                                                                             'fuelEfficiency')
    data = data.select(data.vehicleId, data.fuelEfficiency)
    return data

In [11]:
data_observed=calc_average_efficiency(data_observed)

In [12]:
data_observed.show()

+---------+------------------+
|vehicleId|    fuelEfficiency|
+---------+------------------+
|       v5|45.666666666666664|
|       v2|              29.5|
|       v1|24.333333333333332|
|       v3|              35.0|
|       v4|             40.25|
+---------+------------------+



In [13]:
data_released=read_csv("spark_mock_text/data/vehicles_required_efficiency.csv")

In [14]:
data_released.show()

+---------+----------+
|vehicleId|efficiency|
+---------+----------+
|       v1|      25.0|
|       v2|      30.0|
|       v3|      35.0|
|       v4|      40.0|
|       v5|      45.0|
+---------+----------+



In [23]:
def find_faulty_vehicles(avg_observed: DataFrame, required: DataFrame) -> DataFrame:
    avg_observed.show()
    required.show()
    df=avg_observed.join(required, avg_observed.vehicleId == required.vehicleId, "inner")
    df.show()
    df =df.withColumn("obs", round(df.fuelEfficiency- df.efficiency))
    df.show()
   
    df = df.filter(df.obs<=5)
    df.show()
    return df
    #df.select(df.vehicleId, df.faulty_effieicency)

In [24]:
df=find_faulty_vehicles(data_observed,data_released)

+---------+------------------+
|vehicleId|    fuelEfficiency|
+---------+------------------+
|       v5|45.666666666666664|
|       v2|              29.5|
|       v1|24.333333333333332|
|       v3|              35.0|
|       v4|             40.25|
+---------+------------------+

+---------+----------+
|vehicleId|efficiency|
+---------+----------+
|       v1|      25.0|
|       v2|      30.0|
|       v3|      35.0|
|       v4|      40.0|
|       v5|      45.0|
+---------+----------+

+---------+------------------+---------+----------+
|vehicleId|    fuelEfficiency|vehicleId|efficiency|
+---------+------------------+---------+----------+
|       v5|45.666666666666664|       v5|      45.0|
|       v2|              29.5|       v2|      30.0|
|       v1|24.333333333333332|       v1|      25.0|
|       v3|              35.0|       v3|      35.0|
|       v4|             40.25|       v4|      40.0|
+---------+------------------+---------+----------+

+---------+------------------+---------+---

In [31]:
def save_as(data: DataFrame, output_path: str) -> None:
    # TODO: add your code here
    rdd_data=data.rdd()
    rdd_data.repartition(3)
    rdd_data.saveAsTextFile(output_path)


In [32]:
save_as(df,'mock_e1')

TypeError: 'RDD' object is not callable