In [2]:
carsPath = "/user/s292129/data/exam_06_2020/Cars.txt"
carsFailuresPath = "/user/s292129/data/exam_06_2020/CarsFailures.txt"
out1 = "exam_0620_out1/"
out2 = "exam_0620_out2/"

In [3]:
carsRDD = sc.textFile(carsPath)
# schema: carID, Model, Company, City
failuresRDD = sc.textFile(carsFailuresPath).cache()
# schema: date, time, carID, FailureType

In [5]:
def filterYearsAndType(line):
    fields = line.split(',')
    carID = fields[2]
    date = fields[0]
    failureType = fields[3]
    if ((date.startswith("2017") or date.startswith("2018")) and \
        failureType == "Engine"):
        return True
    else:
        return False

filteredFailuresRDD = failuresRDD.filter(filterYearsAndType)

In [6]:
def mapIdFailuresPerYear(line):
    fields = line.split(",")
    date = fields[0]
    carID = fields[2]
    if (date.startswith('2017')):
        return (carID, (1, 0))
    elif (date.startswith('2018')):
        return (carID, (0, 1))
    
mappedRDD = filteredFailuresRDD.map(mapIdFailuresPerYear)

In [7]:
carID_numFailPerYear = mappedRDD.reduceByKey(lambda v1, v2: (v1[0]+v2[0], v1[1]+v2[1]))

In [8]:
filteredIncreasing = carID_numFailPerYear.filter(lambda pair: pair[1][1] > pair[1][0])

In [10]:
filteredIncreasing.collect()

[('Car15', (0, 9))]

In [9]:
carID_modelRDD = carsRDD.map(lambda line: (line.split(',')[0], line.split(',')[1]))

In [11]:
carID_modelRDD.join(filteredIncreasing).map(lambda pair: (pair[0], pair[1][0]))\
.saveAsTextFile(out1)


# Task B

In [13]:
carDates = failuresRDD.map(lambda line: (line.split(',')[2], line.split(',')[0]))

In [20]:
# This function is already provided 
from datetime import datetime, timedelta
def previousDate(mydate):
    currentDate=datetime.strptime(mydate,"%Y/%m/%d")
    prevDate=currentDate-timedelta(days=1)
    return prevDate.strftime("%Y/%m/%d")

In [14]:
carDatesDistinct = carDates.distinct()

In [19]:
carDatesDistinct.collect()

[('Car15', '2018/01/05'),
 ('Car15', '2018/01/06'),
 ('Car15', '2018/01/07'),
 ('Car15', '2018/01/08'),
 ('Car15', '2018/01/12'),
 ('Car12', '2017/01/06'),
 ('Car12', '2017/01/08'),
 ('Car15', '2018/01/09'),
 ('Car15', '2018/01/10'),
 ('Car15', '2018/01/11'),
 ('Car12', '2018/01/05'),
 ('Car12', '2018/01/06'),
 ('Car12', '2018/01/07'),
 ('Car12', '2018/01/08'),
 ('Car12', '2017/01/05'),
 ('Car12', '2017/01/07'),
 ('Car12', '2017/01/09')]

In [24]:
def emitTwoPairsPerFailure(pair):
    carID = pair[0]
    date = pair[1]
    listOfPairs = [((carID, date), 1), ((carID, previousDate(date)), 1)]
    return listOfPairs

mappedRDD = carDatesDistinct.flatMap(emitTwoPairsPerFailure)

In [25]:
carIDdate_numOfFailures = mappedRDD.reduceByKey(lambda v1, v2: v1+v2)

In [26]:
carIDdate_numOfFailures.collect()

[(('Car15', '2018/01/05'), 2),
 (('Car15', '2018/01/06'), 2),
 (('Car15', '2018/01/07'), 2),
 (('Car15', '2018/01/08'), 2),
 (('Car15', '2018/01/12'), 1),
 (('Car12', '2017/01/06'), 2),
 (('Car12', '2017/01/08'), 2),
 (('Car12', '2018/01/04'), 1),
 (('Car15', '2018/01/04'), 1),
 (('Car15', '2018/01/11'), 2),
 (('Car12', '2017/01/05'), 2),
 (('Car12', '2017/01/07'), 2),
 (('Car15', '2018/01/09'), 2),
 (('Car15', '2018/01/10'), 2),
 (('Car12', '2018/01/05'), 2),
 (('Car12', '2018/01/06'), 2),
 (('Car12', '2018/01/07'), 2),
 (('Car12', '2018/01/08'), 1),
 (('Car12', '2017/01/04'), 1),
 (('Car12', '2017/01/09'), 1)]

In [29]:
carIDdate_numOfFailures.filter(lambda pair: pair[1] == 2)\
.keys().saveAsTextFile(out2)

# SQL BASED SOLUTION - ERRATA

In [34]:
failuresDF = spark.read.load(carsFailuresPath,\
               format="csv",
               header=False,
               inferSchema=True).withColumnRenamed('_c0','date')\
.withColumnRenamed('_c1','time').withColumnRenamed('_c2','carID')\
.withColumnRenamed('_c2','failureType').createOrReplaceTempView('failures')

In [35]:
fail2017DF = spark.sql('SELECT carID, count(*) AS count17\
            FROM failures \
            WHERE date >= "2017/01/01" AND date <= "2017/12/31" \
            GROUP BY carID')

In [36]:
fail2018DF = spark.sql('SELECT carID, count(*) AS count18\
            FROM failures \
            WHERE date >= "2018/01/01" AND date <= "2018/12/31" \
            GROUP BY carID')

In [51]:
fail2017DF.show()

+-----+-------+
|carID|count17|
+-----+-------+
|Car12|      6|
+-----+-------+



In [49]:
increasingFailCarsDF = fail2017DF.join(fail2018DF,fail2017DF.carID==fail2018DF.carID)\
.filter("count17 < count18")

In [50]:
increasingFailCarsDF.show()

+-----+-------+-----+-------+
|carID|count17|carID|count18|
+-----+-------+-----+-------+
+-----+-------+-----+-------+



In [44]:
carsDF = spark.read.load(carsPath, format='csv', header=False, inferSchema=True)\
.withColumnRenamed('_c0','carID').withColumnRenamed('_c1','model')\


In [45]:
carsDF.show()

+-----+------+-----+-----+
|carID| model|  _c2|  _c3|
+-----+------+-----+-----+
|Car12| Panda|  FCA|Paris|
|Car15|Model1|Honda|Paris|
|Car16|Model2|  FCA|Paris|
|Car17|Model3|Honda| Rome|
+-----+------+-----+-----+



In [46]:
carsDF.createOrReplaceTempView('cars')

In [None]:
spark.sql('SELECT carID, model \
           FROM increasingCars JOIN cars ON increasingCars.carID = cars.carID \
           ').write.csv(out1,header=False)

# Task B - ERRATO

In [None]:
spark.sql('SELECT DISTINCT carID, date \
            FROM failures').createOrReplaceTempView('distinctfailures')

In [None]:
prevDateDF = spark.sql('SELECT carID, previousDate(date) AS prevDate \
                        FROM distinctfailures ').createOrReD('prevDateFailures')

In [None]:
spark.sql('SELECT carID, date \
           FROM failures JOIN prevDateFailures ON failures.carID=prevDateFailures.carID \
           AND failures.date=prevDateFailures.prevDate \
           GROUP BY carID, date \
           HAVING count(*)=2').write.csv(out2,header=False)