In [3]:
import pyspark
import random
sc = pyspark.SparkContext(appName="PiSOl")

In [4]:
housePath = "Houses.txt"
consumptionPath = "MonthlyWaterConsumption.txt"

outputPath1 = "outPart1/"
outputPath2 = "outPart2/"

In [5]:
# Define the rdds associated with the input files

# HID,City,Country,YearBuilt
houseRDD = sc.textFile(housePath)

# HID,Month,M3
# Month in YYYY/MM format
consumptionRDD = sc.textFile(consumptionPath)

In [6]:
#########################################
# PART 1
#########################################

In [7]:
# filter water consumptions only for years 2022 and 2021
def filter22_21(l):
    fields = l.split(",")
    month = fields[1]
    
    return month.startswith("2021") or month.startswith("2022")
    

filteredCons = consumptionRDD.filter(filter22_21)

In [8]:
# compute <the trimesters for each record
# to do that, we use division between integers, with months starting from 0 (january = 0, february = 1 etc)
# month // 3 = trimesterID associated to the record
# example: january // 3 = 0, february // 3 = 0, march // 3 = 0, april // 3 = 1 etc.
# We compute then an RDD with
# key = HID, trimesterID, year
# value = water consumption
# and then sum all the values to compute the water consumed in that trimester

def HidTrimYear_Cons(l):
    fields = l.split(",")
    hid = fields[0]
    yearMonth = fields[1]
    consumption = float(fields[2])
    
    f = yearMonth.split("/")
    year = int(f[0])
    month = int(f[1])
    trimester = (month-1) // 3
    
    return ((hid, trimester, year), consumption)
    

waterConsPerTrim = filteredCons\
                    .map(HidTrimYear_Cons)\
                    .reduceByKey(lambda v1, v2: v1 + v2)

In [9]:
# perform a map, obtaining
# key = HID, trimester
# value = (m3 in 2021, m3 in 2022)
# where one of the two entries is 0
# then, use a reduceByKey to sum the entries for each year (to obtain a single entry per house and per trimester)

def hidTrim_cons2122(t):
    hid = t[0][0]
    trim = t[0][1]
    year = t[0][2]
    m3 = t[1]
    
    if (year == 2021):
        return ((hid, trim), (m3, 0.))
    else:
        return ((hid, trim), (0., m3))
    


houseConsPerTrim = waterConsPerTrim\
                    .map(hidTrim_cons2122)\
                    .reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1]))

In [10]:
# select the pairs with an increasing consumption (m3 2022>m3 2021)
# and map to pairs with key = HID, value = +1
# and count for each house the number of increasing consumption associated to the same trimester
# and filter with count >= 3
housesWithIncreasingCons = houseConsPerTrim\
                            .filter(lambda t: t[1][1] > t[1][0])\
                            .map(lambda t: (t[0][0], 1))\
                            .reduceByKey(lambda v1, v2: v1 + v2)\
                            .filter(lambda t: t[1] >= 3)

In [11]:
# Prepare a pairRDD with
# key = hid
# value = city

def hid_city(l):
    fields = l.split(",")
    hid = fields[0]
    city = fields[1]
    
    return (hid, city)
    

houseCity = houseRDD.map(hid_city)

In [12]:
# join the two RDDs to compute result1
# map to HID, City
res1 = houseCity.join(housesWithIncreasingCons)\
                .map(lambda t: (t[0], t[1][0]))

In [14]:
res1.saveAsTextFile(outputPath1)

Py4JJavaError: An error occurred while calling o120.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/jovyan/work/2023_02_15/Spark/outPart1 already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1620)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1620)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1606)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1606)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [15]:
#########################################
# PART 2
#########################################

In [16]:
# compute for each year and houseID, the water consumption
# key = houseID, year
# value = water consumption

def HidYear_Cons(l):
    fields = l.split(",")
    hid = fields[0]
    yearMonth = fields[1]
    consumption = float(fields[2])
    
    f = yearMonth.split("/")
    year = int(f[0])
    
    return ((hid, year), consumption)
    

waterConsPerYearAndHouse = consumptionRDD\
                            .map(HidYear_Cons)\
                            .reduceByKey(lambda v1, v2: v1 + v2)

In [17]:
# Define windows of two consecutive year for each house
# flatMap each input to two pairs
#- key=(houseID, year  ), value=( yearly consumption, +1)
#- key=(houseID, year+1), value=(-1*yearly consumption, +1)

def elements(p):
    pairs = []
    
    hid = p[0][0]
    year = p[0][1]
    
    AnnualCons= p[1]
    
    # - key=(houseID, year  ), value=( yearly consumption, +1)
    pairs.append(((hid,year), (AnnualCons, 1)))
    
    # - key=(houseID, year+1), value=(-yearly consumption, +1)
    pairs.append(((hid,year+1), (-AnnualCons, 1)))
    
    return pairs
    

elementsWindows = waterConsPerYearAndHouse.flatMap(elements)

In [18]:
elementsWindows.collect()

[(('HID1', 2022), (1194.0, 1)),
 (('HID1', 2023), (-1194.0, 1)),
 (('HID2', 2021), (1391.0, 1)),
 (('HID2', 2022), (-1391.0, 1)),
 (('HID3', 2022), (1212.0, 1)),
 (('HID3', 2023), (-1212.0, 1)),
 (('HID4', 2022), (1159.0, 1)),
 (('HID4', 2023), (-1159.0, 1)),
 (('HID5', 2021), (1129.0, 1)),
 (('HID5', 2022), (-1129.0, 1)),
 (('HID6', 2021), (993.0, 1)),
 (('HID6', 2022), (-993.0, 1)),
 (('HID7', 2021), (1115.0, 1)),
 (('HID7', 2022), (-1115.0, 1)),
 (('HID8', 2021), (956.0, 1)),
 (('HID8', 2022), (-956.0, 1)),
 (('HID9', 2021), (1046.0, 1)),
 (('HID9', 2022), (-1046.0, 1)),
 (('HID10', 2021), (1221.0, 1)),
 (('HID10', 2022), (-1221.0, 1)),
 (('HID11', 2022), (1356.0, 1)),
 (('HID11', 2023), (-1356.0, 1)),
 (('HID12', 2022), (1298.0, 1)),
 (('HID12', 2023), (-1298.0, 1)),
 (('HID13', 2022), (1278.0, 1)),
 (('HID13', 2023), (-1278.0, 1)),
 (('HID14', 2022), (1093.0, 1)),
 (('HID14', 2023), (-1093.0, 1)),
 (('HID15', 2021), (904.0, 1)),
 (('HID15', 2022), (-904.0, 1)),
 (('HID16', 2021), 

In [20]:
# Compute for each window the number of elements and the sum of the consumptions 
# (each windows contains at most two values: one positive value for the current year
# and a negative value for the previous year)
windowsElementsAndSumCons = elementsWindows\
                            .reduceByKey(lambda p1, p2: (p1[0]+p2[0], p1[1]+p2[1]))

In [21]:
# Select the windows with two elements and a negative sum of consumption (decreasing consumption)
selectedWindows = windowsElementsAndSumCons\
                    .filter(lambda p: p[1][1]==2 and p[1][0]<0)

In [22]:
# Map each window to (HID, +1) and remove duplicates.

def HID_1(p):
    hid = p[0][0]
    
    return (hid, 1)
    

houseAtLeastOneDecrease = selectedWindows\
                        .map(HID_1)\
                        .distinct()

In [23]:
# Join the previously computed RDD with houseCityRDD to get information related to the city
# and obtain the resulting RDD with
# key = city
# value = +1
# and count the number of houses for each city for which an annual consumption decrease was recorded
cityHouseConsDecr = houseAtLeastOneDecrease\
                    .join(houseCity)\
                    .map(lambda t: (t[1][1], 1))\
                    .reduceByKey(lambda v1, v2: v1 + v2)

In [24]:
cityHouseConsDecr.collect()

[('City8', 1499),
 ('City1', 13158),
 ('City7', 1452),
 ('City3', 1490),
 ('City6', 1535),
 ('City2', 14253),
 ('City0', 12197),
 ('City9', 1514),
 ('City5', 1498),
 ('City4', 1491),
 ('Turin', 3),
 ('Barcelona', 1)]

In [25]:
# filter and keep only the cities with count 
# (number of houses with at least one annual decrease) > 2
citiesToDiscard = cityHouseConsDecr\
                    .filter(lambda t:t[1] > 2)\
                    .keys()

In [28]:
citiesToDiscard.collect()

['City8',
 'City1',
 'City7',
 'City3',
 'City6',
 'City2',
 'City0',
 'City9',
 'City5',
 'City4',
 'Turin']

In [26]:
# Select the cities with at most two houses with at least one annual decrease
res2 = houseCity\
        .map(lambda p: p[1])\
        .distinct()\
        .subtract(citiesToDiscard)

In [30]:
houseCity.map(lambda p: p[1])\
        .distinct().collect()

['City9',
 'City1',
 'City4',
 'City8',
 'City5',
 'Turin',
 'City7',
 'City3',
 'City2',
 'City6',
 'City0',
 'Nince',
 'Barcelona']

In [27]:
res2.collect()

['Nince', 'Barcelona']

In [None]:
res2.saveAsTextFile(outputPath2)