# Assignment 1
En este ejercicio, reduciremos datasets grandes, los cuales pueden tomar mucho tiempo. Debremos usar la lógica map-reduce como hemos aprendido en las lecciones.

Leed el directorio `activations_xml` en vuestro sistema.

Cada fichero XML contiene datos de todos los dispositivos activados por clientes durante los 12 meses de varios años (muchas activaciones por cada fichero). Un ejemplo: 
```xml
<activations>
	<activation timestamp="2013-1-18 11:53.000" type="phone">
		<account-number>454721</account-number>
		<device-id>d051735e-cd2d-11e8-a49d-b86b239563ce</device-id>
		<phone-number>66641135</phone-number>
		<model>WIKO</model>
	</activation>
	<activation timestamp="2013-1-21 03:55.000" type="phone">
		<account-number>239096</account-number>
		<device-id>d051735f-cd2d-11e8-a2c0-b86b239563ce</device-id>
		<phone-number>42743767</phone-number>
		<model>HTC</model>
	</activation>
    ...............
</activations>
```
Por conveniencia se suministran las funciones para parsear el XML, ya que ésto no será el foco del ejercicio. Estas funciones son:
```python
import xml.etree.ElementTree as ElementTree

# Given a string containing XML, parse the string, and 
# return an iterator of activation XML records (Elements) contained in the string
def getActivations(s):
    filetree = ElementTree.fromstring(s)
    return filetree.getiterator('activation')
    
# Given an activation record (XML Element), return the model name
def getModel(activation):
    return activation.find('model').text 

# Given an activation record (XML Element), return the account number 
def getAccount(activation):
    return activation.find('account-number').text 
```
Para procesar los ficheros se debe:
1. Usar `wholeTextFiles` para crear un RDD de activaciones.
2. Cada fichero XML puede contener muchos registros simples de activación. Utiliza transformaciones que asignen a cada activación simple a un registro del RDD, usando las funciones suministradas.
3. Mapear cada activación a un string con formato `account-number:model`.
4. Guarda los strings formateados en un fichero de texto.
5. Mapear cada activación a un registro clave-valor con formato (account-number,model) paa construir un `pairRDD` y contestar a las preguntas que se encuentran a continuación.

QUESTIONS:
1. Presentar las top 5 "accounts" con mayor número de activaciones.
2. Presentar las 5 "accounts" con menor número de activaciones.
3. Presentar los top 5 modelos con mayor numero de activaciones.
3. Cuántas "accounts" distintas han activado un modelo en estos años, independientemente del modelo?
4. Presentar cuántas "accounts" han activado un modelo para cada frecuencia (cuántos clientes han activado 1 modelo, 2 modelos, etc).
5. Presentar los modelos que han activado las top 5 "accounts" con mayor número de activaciones.
6. EXTRA: Cuántas "accounts" distintas han activado un modelo en estos años por modelo?
7. EXTRA: Qué "accounts" activaron cada modelo para cada frecuencia, es decir, para cada modelo qué cuentas activaron ese modelo 1 vez, 2 veces, etc.

In [2]:
import os
import sys

os.environ['SPARK_HOME'] = "C:/spark-2.3.3/spark-2.3.3-bin-hadoop2.7"
os.environ["JAVA_HOME"] = "C:/Java/jdk1.8.0_201"

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

import findspark
findspark.init()

#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession

#Create a Spark Session
spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("MiPrimer") \
    .config("spark.executor.memory", "6g") \
    .config("spark.cores.max","4") \
    .getOrCreate()


#Get the Spark Context from Spark Session    
sc = spark.sparkContext.getOrCreate()

In [3]:
import xml.etree.ElementTree as ElementTree

# Given a string containing XML, parse the string, and 
# return an iterator of activation XML records (Elements) contained in the string
def getActivations(s):
    filetree = ElementTree.fromstring(s)
    return filetree.getiterator('activation')

# Given an activation record (XML Element), return the model name
def getModel(activation):
    return activation.find('model').text 

# Given an activation record (XML Element), return the account number 
def getAccount(activation):
    return activation.find('account-number').text 

### 1. Use `wholeTextFiles()` para leer el directorio xml

In [4]:
data = sc.wholeTextFiles("C:/Users/Ivan_/_Jupyter/Spark/activations-xml/activations-xml/*")
data.take(1)

[('file:/C:/Users/Ivan_/_Jupyter/Spark/activations-xml/activations-xml/2013-01.xml',
  '<activations>\r\n\t<activation timestamp="2013-1-24 18:39.000" type="phone">\r\n\t\t<account-number>6845</account-number>\r\n\t\t<device-id>8126d422-f3c7-11e8-897c-b86b239563ce</device-id>\r\n\t\t<phone-number>6610058</phone-number>\r\n\t\t<model>PLUM</model>\r\n\t</activation>\r\n\t<activation timestamp="2013-1-11 19:38.000" type="phone">\r\n\t\t<account-number>361</account-number>\r\n\t\t<device-id>8127220c-f3c7-11e8-8a29-b86b239563ce</device-id>\r\n\t\t<phone-number>6240295</phone-number>\r\n\t\t<model>XIAOMI</model>\r\n\t</activation>\r\n\t<activation timestamp="2013-1-09 05:47.000" type="phone">\r\n\t\t<account-number>8724</account-number>\r\n\t\t<device-id>8127220d-f3c7-11e8-9e2a-b86b239563ce</device-id>\r\n\t\t<phone-number>6189944</phone-number>\r\n\t\t<model>LEECO</model>\r\n\t</activation>\r\n\t<activation timestamp="2013-1-04 18:58.000" type="phone">\r\n\t\t<account-number>5840</account-n

### 2. Usar una transformación para leer una activación en cada registro del RDD

In [5]:
Rdd = data.map(lambda x: x[1]).flatMap(lambda x: getActivations(x))
Rdd.take(5)

[<Element 'activation' at 0x0000021A7FDE8908>,
 <Element 'activation' at 0x0000021A7FDEDEA8>,
 <Element 'activation' at 0x0000021A7FE01098>,
 <Element 'activation' at 0x0000021A7FE01228>,
 <Element 'activation' at 0x0000021A7FE013B8>]

### 3. Mapear cada activación a un registro "account-number:model-name"

In [6]:
Rdd_2 = Rdd.map(lambda x: getAccount(x)+":"+getModel(x))
Rdd_2.take(5)

['6845:PLUM', '361:XIAOMI', '8724:LEECO', '5840:ACER', '6563:LG']

### 4. Guardar los datos a un fichero de texto

In [None]:
Rdd_2.saveAsTextFile ("C:\\Users\\Ivan_\\Downloads")

### 5. Construir un nuevo RDD, aplicando las transformaciones correspondientes para contestar a las preguntas

In [7]:
Rdd_3 = Rdd.map(lambda x: ((getAccount(x),1)))
Rdd_3.take(5)

[('6845', 1), ('361', 1), ('8724', 1), ('5840', 1), ('6563', 1)]

#### 5.1 Presentar las top 5 "accounts" con mayor número de activaciones

In [8]:
Copy_Rdd = Rdd_3.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1],False)
Copy_Rdd.take(5)

[('3829', 12), ('2219', 12), ('4084', 12), ('5158', 11), ('3007', 11)]

#### 5.2 Presentar las 5 "accounts" con menor número de activaciones

In [9]:
Rdd_3.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1]).take(5)

[('7055', 1), ('9704', 1), ('2086', 1), ('6493', 1), ('5537', 1)]

#### 5.3 Presentar los top 5 modelos con mayor numero de activaciones

In [10]:
Rdd_4 = Rdd.map(lambda x: ((getModel(x),1)))
Rdd_4.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1],False).take(5)

[('HP', 1073), ('LEECO', 1064), ('LAVA', 1060), ('APPLE', 1058), ('HTC', 1042)]

#### 5.4 Cuántas "accounts" distintas han activado un modelo en estos años, independientemente del modelo?

In [11]:
Rdd_5 = Rdd.map(lambda x: (getAccount(x),1)).reduceByKey(lambda x,y: x+y).count()
Rdd_5

9731

#### 5.5 Presentar cuántas "accounts" han activado un modelo para cada frecuencia (cuántos clientes han activado 1 modelo, 2 modelos, etc)

In [327]:
Rdd_6 = Rdd.map(lambda x: (getAccount(x),1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[1],x[0])).countByKey()
Rdd_6

defaultdict(int,
            {1: 989,
             2: 1759,
             3: 2160,
             4: 1925,
             5: 1339,
             6: 798,
             7: 428,
             8: 219,
             9: 75,
             10: 25,
             11: 11,
             12: 3})

#### 5.6 Presentar los modelos que han activado las top 5 "accounts" con mayor número de activaciones.

In [261]:
Rdd_7 = Rdd.map(lambda x: (getAccount(x),getModel(x))).distinct().groupByKey().map(lambda x: (x[0],list(x[1])))\
.sortBy(lambda x: x[1],False)
copy = Rdd_7.join(Copy_Rdd).sortBy(lambda x: x[1][1],False)
copy.take(5)

[('3829',
  (['WIKO',
    'ALCATEL',
    'PANASONIC',
    'ONEPLUS',
    'MEIZU',
    'MOTOROLA',
    'ZTE',
    'VODAFONE',
    'XIAOMI',
    'YU'],
   12)),
 ('2219',
  (['VIVO',
    'TOSHIBA',
    'PLUM',
    'ALCATEL',
    'LG',
    'LAVA',
    'XIAOMI',
    'APPLE',
    'MAXWEST',
    'YU',
    'ACER'],
   12)),
 ('4084',
  (['ZTE',
    'YU',
    'HTC',
    'BLU',
    'MOTOROLA',
    'ONEPLUS',
    'OPPO',
    'LENOVO',
    'PANASONIC',
    'ALCATEL'],
   12)),
 ('5191',
  (['TOSHIBA',
    'ONEPLUS',
    'LENOVO',
    'HP',
    'PANASONIC',
    'VERYKOOL',
    'BLACKBERRY',
    'XIAOMI',
    'ACER',
    'APPLE'],
   11)),
 ('6192',
  (['HP',
    'SAMSUNG',
    'PANASONIC',
    'PLUM',
    'WIKO',
    'OPPO',
    'HTC',
    'MAXWEST',
    'ENERGIZER',
    'YU'],
   11))]

#### 5.7 EXTRA: Cuántas "accounts" distintas han activado un modelo en estos años por modelo?

In [267]:
Rdd_8 = Rdd.map(lambda x: (getModel(x),getAccount(x))).distinct().countByKey()
Rdd_8

defaultdict(int,
            {'ACER': 902,
             'ALCATEL': 920,
             'APPLE': 1002,
             'ASUS': 953,
             'BLACKBERRY': 963,
             'BLU': 965,
             'CAT': 936,
             'ENERGIZER': 953,
             'GOOGLE': 981,
             'HP': 1015,
             'HTC': 987,
             'HUAWEI': 922,
             'LAVA': 1009,
             'LEECO': 1011,
             'LENOVO': 961,
             'LG': 939,
             'MAXWEST': 903,
             'MEIZU': 965,
             'MICROMAX': 950,
             'MICROSOFT': 915,
             'MOTOROLA': 927,
             'NOKIA': 961,
             'ONEPLUS': 922,
             'OPPO': 965,
             'PANASONIC': 931,
             'PLUM': 959,
             'SAMSUNG': 968,
             'SONY': 979,
             'TOSHIBA': 958,
             'VERYKOOL': 923,
             'VIVO': 948,
             'VODAFONE': 925,
             'WIKO': 937,
             'XIAOMI': 896,
             'YU': 975,
             '

#### 5.8 EXTRA: Qué "accounts" activaron cada modelo para cada frecuencia, es decir, para cada modelo qué cuentas activaron ese modelo 1 vez, 2 veces, etc.

In [19]:
Rdd_9 = Rdd.map(lambda x: ((getModel(x),getAccount(x)),1)).reduceByKey(lambda x,y: x+y)\
.map(lambda x: (x[0][0],(x[0][1],x[1]))).groupByKey().map(lambda x: (x[0],list(x[1])))
Rdd_9.collect()

[('PLUM',
  [[('6845', 1),
    ('8262', 1),
    ('3229', 1),
    ('4616', 1),
    ('2389', 1),
    ('8462', 1),
    ('6846', 2),
    ('9033', 1),
    ('2653', 1),
    ('4967', 1),
    ('6796', 1),
    ('1650', 1),
    ('6080', 2),
    ('8691', 1),
    ('9267', 1),
    ('2548', 1),
    ('4865', 1),
    ('1913', 2),
    ('8935', 1),
    ('6864', 1),
    ('9147', 1),
    ('1803', 1),
    ('2254', 1),
    ('1866', 1),
    ('580', 2),
    ('5105', 1),
    ('3615', 1),
    ('3942', 1),
    ('7164', 1),
    ('3525', 1),
    ('6530', 1),
    ('2969', 1),
    ('9238', 1),
    ('4768', 1),
    ('198', 1),
    ('4928', 1),
    ('5123', 1),
    ('993', 1),
    ('3806', 1),
    ('3124', 1),
    ('1131', 1),
    ('515', 1),
    ('980', 1),
    ('4951', 1),
    ('2118', 1),
    ('947', 1),
    ('6118', 1),
    ('7883', 1),
    ('3296', 1),
    ('8179', 1),
    ('5997', 1),
    ('1347', 1),
    ('1463', 1),
    ('1661', 1),
    ('1576', 1),
    ('9581', 1),
    ('8324', 1),
    ('4850', 1),
    ('2779

In [49]:
sc.stop()