In [52]:
from os import truncate
import findspark
from pyspark import SparkContext, SparkConf, SQLContext
import csv
import geopandas as gpd
from pyspark.sql.functions import expr, lit, udf
from shapely import wkt
from pyspark.ml.feature import MinMaxScaler, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import lightgbm as lgb

findspark.init()  # Con este no me tira error de JVM.

In [53]:
def context():
    """Environment context.
    """

    # Naming the Master and de app.
    conf = SparkConf().setMaster('local').setAppName('Tarea Analisis de'
                                                     'BigData')

    # Starting Spark Cluster.
    sc = SparkContext.getOrCreate(conf=conf)

    # Starting SqlContext from sc.
    sqlContext = SQLContext(sc)
    return sqlContext


sqlContext = context()

FilePath = '/home/rafa/Dropbox/Linux_MDS/BDAnalytics/sprint1/data'

In [54]:
def open_files(sqlContext, FilePath):
    """Opening files.

    Args:
        sqlContext (Context): Pyspark environment.
        FilePath (String): Path where the files are.

    Returns:
        Spark dataframe: Spark dataframes of 2017, 2018 and 2019.
    """
    FileName1 = 'wifi_2017.csv'
    FileName2 = 'wifi_2018.csv'
    FileName3 = 'wifi_2019.csv'

    FullPath1 = FilePath + '/' + FileName1
    FullPath2 = FilePath + '/' + FileName2
    FullPath3 = FilePath + '/' + FileName3

    df_2017 = sqlContext.read.csv(FullPath1, header=True)
    df_2018 = sqlContext.read.csv(FullPath2, header=True)
    df_2019 = sqlContext.read.csv(FullPath3, header=True)
    return df_2017, df_2018, df_2019


df_2017, df_2018, df_2019 = open_files(sqlContext, FilePath)

In [55]:
def clean_2017(df_2017):
    """Cleaning 2017.

    Args:
        df_2017 (Spark dataframe): Spark dataframe than contain 2017.
    """
    df_2017 = df_2017.withColumnRenamed('data_source', 'data')

    df_2017 = df_2017.drop('range', 'created')

# Sorting columns.
    df_2017 = df_2017.select('id', 'bssid', 'lat', 'lon', 'updated', 'data')
    return df_2017


df_2017 = clean_2017(df_2017)

print('==============')
print('Sprint 1')
print('==============')

Sprint 1


In [56]:
def union_original_df(df_2017, df_2018, df_2019):
    """Applying union to all df's.

    Args:
        df_2017 (Spark dataframe): It's contain data regarding 2017.
        df_2018 (Spark dataframe): It's contain data regarding 2018.
        df_2019 (Spark dataframe): It's contain data regarding 2019.

    Returns:
        Spark dataframe: Union between above df's.
    """
    df_unidos = ((df_2017.union(df_2018)).union(df_2019)).distinct()
    print('All csv files dataframe:\n')
    df_unidos.show(truncate=False)
    df_unidos = df_unidos.drop('updated', 'data')
    return df_unidos

In [57]:
df_unidos = union_original_df(df_2017, df_2018, df_2019)

All csv files dataframe:

+----+------------+-----------+-----------+----------+----+
|id  |bssid       |lat        |lon        |updated   |data|
+----+------------+-----------+-----------+----------+----+
|802 |00040E94ED73|50.08081200|8.28433922 |1428513555|0   |
|1076|00095BDC1C2A|53.68903505|9.99661764 |1428513555|0   |
|1198|000B6B2F0FAC|49.68090465|8.62257210 |1428513555|0   |
|1304|000B850DA2B1|49.39064487|8.63899747 |1428513555|0   |
|1371|00040EE37419|49.66931085|9.00361495 |1428513555|0   |
|1484|00040ED8D824|51.16131842|13.47315431|1428513555|0   |
|1893|0001E30E688B|49.54164425|8.63952318 |1428513555|0   |
|2077|00040E8F92CD|50.93837781|6.94276197 |1428513555|0   |
|2586|000CF67F25B8|49.49755490|8.55307102 |1428513555|0   |
|2785|000FBB11D5A8|48.37995887|10.92875719|1428513555|0   |
|2971|000FB5B82AFC|50.07127597|8.43380130 |1428513555|0   |
|3085|000352A70940|50.10473795|8.66833063 |1428513555|0   |
|3202|00040EEDE668|53.56497360|9.80239220 |1428513555|0   |
|3437|00040EDA

In [58]:
def santiago_only(df_unidos):
    """It's get only Santiago city from dataset.

    Args:
        df_unidos (Spark dataframe): It's contain all data.

    Returns:
        Spark Dataframe: Spark dataframe that only contain Santiago.
    """
    f1_fabricante = df_unidos.filter((df_unidos.lat >= -33.65) &
                                     (df_unidos.lat <= -33.28) &
                                     (df_unidos.lon >= -70.81) &
                                     (df_unidos.lon <= -70.50))
    return f1_fabricante

In [59]:
f1_fabricante = santiago_only(df_unidos)

In [60]:
def mac_maker(f1_fabricante):
    """It's create two new columns with Id_fabricante and Media_mac.

    Args:
        f1_fabricante (Spark dataframe): Spark dataframe that only contain
        Santiago.

    Returns:
        Spark dataframe: Spark dataframe than contain in two separated columns
        the Id_fabricante and Media_mac
    """
    f1_fabricante = f1_fabricante.\
        withColumn('Id_fabricante',
                   expr('substring(bssid,1,length(bssid)-6)'))\
        .withColumn('Media_mac', expr('substring(bssid,7,length(bssid)-6)')).\
        drop('bssid')

    print('Santiago´s dataframe:\n')
    f1_fabricante.show()
    return f1_fabricante

In [61]:
f1_fabricante = mac_maker(f1_fabricante)

Santiago´s dataframe:

+-------+------------+------------+-------------+---------+
|     id|         lat|         lon|Id_fabricante|Media_mac|
+-------+------------+------------+-------------+---------+
|8849791|-33.39275272|-70.57426126|       1CE6C7|   F010E6|
|8849792|-33.39250351|-70.57437245|       1CE6C7|   F010E5|
|8850002|-33.39918647|-70.58116012|       001EE5|   3444A9|
|8850139|-33.39342478|-70.57411691|       64E950|   33D911|
|8850625|-33.40811106|-70.58709986|       FC94E3|   0C3C64|
|8851402|-33.39346036|-70.57409400|       34DBFD|   BFF298|
|8991482|-33.40576570|-70.56381641|       001DCF|   203FD1|
|8991542|-33.40188118|-70.56105160|       2CAB25|   85EF3F|
|8991749|-33.39870261|-70.55496093|       004F62|   1D5873|
|8992564|-33.41227703|-70.57368460|       40CBA8|   CCBD9D|
|8992611|-33.40116664|-70.56038709|       B4750E|   38BC6E|
|8993378|-33.42087583|-70.56632524|       14ABF0|   DD5A70|
|8994248|-33.41674528|-70.57156504|       70105C|   7CD8F0|
|8994782|-33.4031

In [62]:
# Full path that contain oui.txt
oui_path = '/home/rafa/Dropbox/Linux_MDS/BDAnalytics/sprint1/data/oui.txt'

In [63]:
def dict_maker_id():
    """It's generate a dictionary that contains the id and the makers names.

    Returns:
        Dictionary: It's contain the id and the makers name.
    """
    dict_vendor_id = dict()

    for lig in open(oui_path):
        if 'base 16' in lig:
            num, sep, txt = lig.strip().partition('(base 16)')
            dict_vendor_id[num.strip()] = txt.strip()
    return dict_vendor_id

In [64]:
dict_vendor_id = dict_maker_id()

In [65]:
# Creating a csv file that contain the Id_Fabricante and Media_mac
csv_path = '/home/rafa/Dropbox/Linux_MDS/BDAnalytics/sprint1/oui.csv'

In [66]:
def to_csv(dict_vendor_id):
    """Converting dictionary to csv.

    Args:
        dict_vendor_id (Dictionary): It's contain the vendor id and the name.
    """
    with open(csv_path, 'w') as f:
        w = csv.writer(f)
        w.writerows(dict_vendor_id.items())

In [67]:
to_csv(dict_vendor_id)

In [68]:
def oui_dataframe(sqlContext):
    """It's create the oui spark dataframe

    Args:
        sqlContext (context): Pyspark environment.

    Returns:
        Spark dataframe: It's contain the id and the name of the makers.
    """
    df_oui = sqlContext.read.csv(csv_path, header=False)
    print('OUI.txt´s dataframe:\n')
    df_oui.show(truncate=False)
    return df_oui

In [69]:
df_oui = oui_dataframe(sqlContext)

OUI.txt´s dataframe:

+------+------------------------------------------------------------+
|_c0   |_c1                                                         |
+------+------------------------------------------------------------+
|002272|American Micro-Fuel Device Corp.                            |
|00D0EF|IGT                                                         |
|086195|Rockwell Automation                                         |
|F4BD9E|Cisco Systems, Inc                                          |
|5885E9|Realme Chongqing MobileTelecommunications Corp Ltd          |
|BC2392|BYD Precision Manufacture Company Ltd.                      |
|405582|Nokia                                                       |
|A4E31B|Nokia                                                       |
|D89790|Commonwealth Scientific and Industrial Research Organisation|
|883A30|Aruba, a Hewlett Packard Enterprise Company                 |
|B8A58D|Axe Group Holdings Limited                                  

In [70]:
# Full path for shape file Manzana Precensal.
shape_path = ('/home/rafa/Dropbox/Linux_MDS/BDAnalytics/sprint1/data/'
              'Manzana_Precensal.shp')

In [71]:
def shape_file():
    """ Open and clean.
    """
    Manzana_Precensal = gpd.read_file(shape_path)

    # Drop unnecessary columns.
    Manzana_Precensal = Manzana_Precensal.drop(['DES_REGI', 'MANZENT',
                                                'COMUNA',
                                                'PROVINCIA', 'DES_PROV',
                                                'REGION', 'COD_DIS'], axis=1)

    print('Shape file')
    print(Manzana_Precensal)
    return Manzana_Precensal


Manzana_Precensal = shape_file()

print('==============')
print('Sprint 2')
print('==============')

Shape file
       COD_ZON  COD_ENT   DES_COMU  \
0            3        1  CERRILLOS   
1            3        4  CERRILLOS   
2            3        5  CERRILLOS   
3            3        8  CERRILLOS   
4            3        9  CERRILLOS   
...        ...      ...        ...   
53048        1      504   PEÑAFLOR   
53049        1      503   PEÑAFLOR   
53050        1      502   PEÑAFLOR   
53051        1      505   PEÑAFLOR   
53052        1      501   PEÑAFLOR   

                                                geometry  
0      POLYGON ((-70.72166 -33.47385, -70.72197 -33.4...  
1      POLYGON ((-70.72136 -33.47420, -70.72160 -33.4...  
2      POLYGON ((-70.72105 -33.47455, -70.72124 -33.4...  
3      POLYGON ((-70.72079 -33.47509, -70.72089 -33.4...  
4      POLYGON ((-70.72052 -33.47563, -70.72052 -33.4...  
...                                                  ...  
53048  POLYGON ((-70.87621 -33.61954, -70.87751 -33.6...  
53049  POLYGON ((-70.87723 -33.61920, -70.87741 -33.6...  
5

In [72]:
def future_georef(sqlContext, f1_fabricante, df_oui, Manzana_Precensal):
    """It's get the geo future.

    Args:
        sqlContext (context): Pyspark environment
        f1_fabricante (Spark dataframe): Spark dataframe that only contain
        Santiago.
        df_oui (Dataframe): It's contain the Id_fabricante and the name.
        Manzana_Precensal (Shape file): Georeference dataframe.

    Returns:
        Spark dataframe: Spark dataframe that contain the geo future.
    """

    # Join the df_stgo and df_oui through a join function and also make where
    # manufacturer_id is identical to _c0 of the df_oui.
    f1_fabricante = f1_fabricante.join(df_oui).\
        where(f1_fabricante["Id_fabricante"] == df_oui["_c0"])

    f1_fabricante = f1_fabricante.drop('_c0')

    f1_fabricante = f1_fabricante.withColumnRenamed('_c1', 'Fabricante')
    print('First future dataframe:\n')
    f1_fabricante.show(truncate=False)

    # Tranform df_stgo to pandas dataframe to work with geopandas.
    df_stgo_pandas = f1_fabricante.toPandas()

    # Creating a geopandas to indicate lat and lon points.
    df_stgo_geop = gpd.GeoDataFrame(df_stgo_pandas,
                                    geometry=gpd.points_from_xy
                                    (df_stgo_pandas.lon, df_stgo_pandas.lat))

    df_stgo_geop = df_stgo_geop.drop(columns=['lat', 'lon'])

    # CRS.
    df_stgo_geop.crs = 'EPSG:4674'

    # Joining with geopandas  df_stgo_geop and Manzana_Precensal.
    join_stgo_manzana = gpd.sjoin(df_stgo_geop, Manzana_Precensal, op='within',
                                  how='inner')

    join_stgo_manzana = join_stgo_manzana.drop(columns=['index_right'])
    join_stgo_manzana['str_geom'] = join_stgo_manzana.geometry.\
        apply(lambda x: wkt.dumps(x))
    join_stgo_manzana = (join_stgo_manzana.drop(columns=['geometry'])
                         ).rename(columns={'str_geom': 'geometry',
                                           'COD_ZON': 'Zona_Censal',
                                           'COD_ENT': 'Manzana_Censal',
                                           'DES_COMU': 'Comuna'})

    # Converting to pyspark.
    f1_georeferencia = sqlContext.createDataFrame(join_stgo_manzana)

    print('Dataframe with geo features\n')
    f1_georeferencia.show(truncate=False)
    return f1_georeferencia

In [73]:
df_union2 = future_georef(sqlContext, f1_fabricante, df_oui,
                          Manzana_Precensal)

First future dataframe:

+-------+------------+------------+-------------+---------+--------------------------------------------+
|id     |lat         |lon         |Id_fabricante|Media_mac|Fabricante                                  |
+-------+------------+------------+-------------+---------+--------------------------------------------+
|8849791|-33.39275272|-70.57426126|1CE6C7       |F010E6   |Cisco Systems, Inc                          |
|8849792|-33.39250351|-70.57437245|1CE6C7       |F010E5   |Cisco Systems, Inc                          |
|8850002|-33.39918647|-70.58116012|001EE5       |3444A9   |Cisco-Linksys, LLC                          |
|8850139|-33.39342478|-70.57411691|64E950       |33D911   |Cisco Systems, Inc                          |
|8850625|-33.40811106|-70.58709986|FC94E3       |0C3C64   |Technicolor CH USA Inc.                     |
|8851402|-33.39346036|-70.57409400|34DBFD       |BFF298   |Cisco Systems, Inc                          |
|8991482|-33.40576570|-70.5638

In [74]:
def quantity_proportion(sqlContext, f1_georeferencia):
    """It's get the quantity and proportion of all wifi makers plus the above
    future.

    Args:
        sqlContext (context): Pyspark environment.
        f1_georeferencia (Spark dataframe): Spark dataframe that contain the
        geo future.

    Returns:
        Spark dataframe: Spark dataframe that contain the geo future plus the
        above future.
    """
    f1_georeferencia = f1_georeferencia.toPandas()

    f1_georeferencia['q_ARRIS_Group'] = f1_georeferencia.\
        apply(lambda x: 1 if (x["Fabricante"]) ==
              'ARRIS Group, Inc.' else 0, axis=1)

    f1_georeferencia['q_Cisco_Systems_Inc'] = f1_georeferencia.\
        apply(lambda x: 1 if (x["Fabricante"]) ==
              'Cisco Systems, Inc' else 0, axis=1)

    f1_georeferencia['q_Technicolor'] = f1_georeferencia.\
        apply(lambda x: 1 if (x["Fabricante"]) ==
              'Technicolor CH USA Inc.' else 0, axis=1)

    # Suming.
    suma = f1_georeferencia['q_ARRIS_Group'].sum() +\
        f1_georeferencia['q_Cisco_Systems_Inc'].sum() +\
        f1_georeferencia['q_Technicolor'].sum()

    # Proportion.
    f1_georeferencia['p_ARRIS_Group'] = f1_georeferencia.\
        apply(lambda x: 1/suma if (x["Fabricante"]) ==
              'ARRIS Group, Inc.' else 0, axis=1)

    f1_georeferencia['p_Cisco_Systems_Inc'] = f1_georeferencia.\
        apply(lambda x: 1/suma if (x["Fabricante"]) ==
              'Cisco Systems, Inc' else 0, axis=1)

    f1_georeferencia['p_Technicolor'] = f1_georeferencia.\
        apply(lambda x: 1/suma if (x["Fabricante"]) ==
              'Technicolor CH USA Inc.' else 0, axis=1)

    f2_sum_prop = sqlContext.createDataFrame(f1_georeferencia)
    print('Final df for Sprint 2:\n')
    f2_sum_prop.show(truncate=False)
    return f2_sum_prop

In [75]:
quantity_proportion(sqlContext, df_union2)

Final df for Sprint 2:

+--------+-------------+---------+-----------------------------+-----------+--------------+--------+-------------------------------------------------+-------------+-------------------+-------------+--------------------+--------------------+-------------+
|id      |Id_fabricante|Media_mac|Fabricante                   |Zona_Censal|Manzana_Censal|Comuna  |geometry                                         |q_ARRIS_Group|q_Cisco_Systems_Inc|q_Technicolor|p_ARRIS_Group       |p_Cisco_Systems_Inc |p_Technicolor|
+--------+-------------+---------+-----------------------------+-----------+--------------+--------+-------------------------------------------------+-------------+-------------------+-------------+--------------------+--------------------+-------------+
|8849791 |1CE6C7       |F010E6   |Cisco Systems, Inc           |5          |33            |VITACURA|POINT (-70.5742612600000001 -33.3927527199999972)|0            |1                  |0            |0.0          

DataFrame[id: string, Id_fabricante: string, Media_mac: string, Fabricante: string, Zona_Censal: bigint, Manzana_Censal: bigint, Comuna: string, geometry: string, q_ARRIS_Group: bigint, q_Cisco_Systems_Inc: bigint, q_Technicolor: bigint, p_ARRIS_Group: double, p_Cisco_Systems_Inc: double, p_Technicolor: double]

In [76]:
quantity_proportion(sqlContext, df_union2)

print('==============')
print('Sprint 3')
print('==============')

# Year 2018.
print('==============')
print('Steps for 2018')
print('==============')

Final df for Sprint 2:

+--------+-------------+---------+-----------------------------+-----------+--------------+--------+-------------------------------------------------+-------------+-------------------+-------------+--------------------+--------------------+-------------+
|id      |Id_fabricante|Media_mac|Fabricante                   |Zona_Censal|Manzana_Censal|Comuna  |geometry                                         |q_ARRIS_Group|q_Cisco_Systems_Inc|q_Technicolor|p_ARRIS_Group       |p_Cisco_Systems_Inc |p_Technicolor|
+--------+-------------+---------+-----------------------------+-----------+--------------+--------+-------------------------------------------------+-------------+-------------------+-------------+--------------------+--------------------+-------------+
|8849791 |1CE6C7       |F010E6   |Cisco Systems, Inc           |5          |33            |VITACURA|POINT (-70.5742612600000001 -33.3927527199999972)|0            |1                  |0            |0.0          

In [77]:
def eighteen(sqlContext, df_2018, solo_santiago, mac_y_fabricante, df_oui,
             Manzana_Precensal, future_georef, lamba_rellenar):
    """Get the features for sprint 2.

    Args:
        sqlContext (Context): Pyspark environment.
        df_2018 (Spark dataframe): It's contain 2018 info.
        solo_santiago (Function): It's create a df for Santiago only.
        mac_y_fabricante (Function): It's create two new columns with
        Id_fabricante and Media_mac.
        df_oui (Spark dataframe): It's contain the id and name of the makers.
        Manzana_Precensal (Geopandas): It's contain the info of the shapefile.
        future_georef (Function): It's create the future with the geo.
        lamba_rellenar (Function): It's get the quantity and proportion of
        all wifi makers plus the above future.

    Returns:
        Spark dataframe: It's contain all features for sprint 2.
    """
    df_2018 = df_2018.drop('updated', 'data')
    df_2018 = solo_santiago(df_2018)
    f1_fab_2018 = mac_y_fabricante(df_2018)
    f1_geo_2018 = future_georef(
        sqlContext, f1_fab_2018, df_oui, Manzana_Precensal)
    f2_2018 = lamba_rellenar(sqlContext, f1_geo_2018).distinct()
    f2_2018 = f2_2018\
        .withColumnRenamed('q_Arris_Group', 'q2018_Arris_Group')\
        .withColumnRenamed('q_Cisco_Systems_Inc', 'q2018_Cisco_Systems_Inc')\
        .withColumnRenamed('q_Technicolor', 'q2018_Technicolor')\
        .withColumnRenamed('p_ARRIS_Group', 'p2018_ARRIS_Group')\
        .withColumnRenamed('p_Cisco_Systems_Inc', 'p2018_Cisco_Systems_Inc')\
        .withColumnRenamed('p_Technicolor', 'p2018_Technicolor')

    return f2_2018

In [78]:
f2_2018 = eighteen(sqlContext, df_2018, santiago_only, mac_maker,
                   df_oui, Manzana_Precensal, future_georef,
                   quantity_proportion)

Santiago´s dataframe:

+-------+------------+------------+-------------+---------+
|     id|         lat|         lon|Id_fabricante|Media_mac|
+-------+------------+------------+-------------+---------+
|2328903|-33.41190269|-70.59103650|       021A11|   FA6F64|
|3648783|-33.40335311|-70.57694492|       000C43|   305078|
|4218478|-33.41766917|-70.57476819|       10F96F|   EB5D31|
|6225902|-33.41709922|-70.59772660|       022A6A|   A0B39B|
|8837909|-33.41273061|-70.59400614|       BC9680|   DE5933|
|8849256|-33.39880726|-70.58795291|       586D8F|   19CB85|
|8849258|-33.39880726|-70.58795291|       728B97|   9DD133|
|8849264|-33.40762979|-70.60031867|       D83062|   2E7695|
|8849268|-33.39285484|-70.57425676|       A45D36|   3BE061|
|8849282|-33.39868293|-70.58754253|       F4F1E1|   D6A1D9|
|8849284|-33.39744255|-70.58129586|       90E6BA|   A1F845|
|8849285|-33.40206855|-70.59419039|       FC94E3|   31C7CB|
|8849288|-33.39563392|-70.58049829|       8C04FF|   13B85C|
|8849290|-33.4064

In [79]:
# Year 2019.
print('==============')
print('Steps for 2019')
print('==============')

Steps for 2019


In [80]:
def nineteen(sqlContext, df_2019, solo_santiago, mac_y_fabricante, df_oui,
             Manzana_Precensal, future_georef, lamba_rellenar, f2_2018):
    """Get the features for sprint 2.

    Args:
        sqlContext (Context): Pyspark environment.
        df_2019 (Spark dataframe): It's contain 2019 info.
        solo_santiago (Function): It's create a df for Santiago only.
        mac_y_fabricante (Function): It's create two new columns with
        Id_fabricante and Media_mac.
        df_oui (Spark dataframe): It's contain the id and name of the makers.
        Manzana_Precensal (Geopandas): It's contain the info of the shapefile.
        future_georef (Function): It's create the future with the geo.
        lamba_rellenar (Function): It's get the quantity and proportion of
        all wifi makers plus the above future.

    Returns:
        Spark dataframe: It's contain all features for sprint 2.
    """
    df_2019 = df_2019.drop('updated', 'data')
    df_2019 = solo_santiago(df_2019)
    f1_fab_2019 = mac_y_fabricante(df_2019)
    f1_geo_2019 = future_georef(
        sqlContext, f1_fab_2019, df_oui, Manzana_Precensal)
    f2_2019 = lamba_rellenar(sqlContext, f1_geo_2019).distinct()
    f2_2019 = f2_2019\
        .withColumnRenamed('q_Arris_Group', 'q2019_Arris_Group')\
        .withColumnRenamed('q_Cisco_Systems_Inc', 'q2019_Cisco_Systems_Inc')\
        .withColumnRenamed('q_Technicolor', 'q2019_Technicolor')\
        .withColumnRenamed('p_ARRIS_Group', 'p2019_ARRIS_Group')\
        .withColumnRenamed('p_Cisco_Systems_Inc', 'p2019_Cisco_Systems_Inc')\
        .withColumnRenamed('p_Technicolor', 'p2019_Technicolor')

    # Final dataframes for years 2018 and 2019.
    print('Final 2018 dataframe after drop duplicates:\n')
    f2_2018.show(truncate=False)
    print('Final 2019 dataframe after drop duplicates:\n')
    f2_2019.show(truncate=False)
    return f2_2019

In [81]:
f2_2019 = nineteen(sqlContext, df_2019, santiago_only, mac_maker,
                   df_oui, Manzana_Precensal, future_georef,
                   quantity_proportion,
                   f2_2018)

Santiago´s dataframe:

+-------+------------+------------+-------------+---------+
|     id|         lat|         lon|Id_fabricante|Media_mac|
+-------+------------+------------+-------------+---------+
|1285341|-33.41427150|-70.58757711|       0014D1|   C215FC|
|1286364|-33.39549419|-70.57584321|       0014D1|   C2B5CB|
|1286365|-33.39581286|-70.57651276|       0014D1|   C2B5CD|
|1289316|-33.41758751|-70.57447417|       FE1EDF|   FAF7CB|
|1384654|-33.41263839|-70.56671968|       5C969D|   6AB06B|
|2328903|-33.41190269|-70.59103650|       021A11|   FA6F64|
|3648783|-33.40335311|-70.57694492|       000C43|   305078|
|4218478|-33.41766917|-70.57476819|       10F96F|   EB5D31|
|6225902|-33.41709922|-70.59772660|       022A6A|   A0B39B|
|8837909|-33.41273061|-70.59400614|       BC9680|   DE5933|
|8849256|-33.39880726|-70.58795291|       586D8F|   19CB85|
|8849258|-33.39880726|-70.58795291|       728B97|   9DD133|
|8849264|-33.40762979|-70.60031867|       D83062|   2E7695|
|8849268|-33.3928

In [82]:
def differences(sqlContext, f2_2018, f2_2019):
    """It's create the future for differences between 2018 and 2019 regarding
    quantity and proportion.

    Args:
        sqlContext (context): Pyspark environment.
        f2_2018 (Spark dataframe): It's contain all 2018 with above features
        f2_2019 (Spark dataframe): It's contain all 2019 with above features.

    Returns:
    Spark dataframe: Spark dataframe that contain features before differences
    between years.
    """

    # f2_2019.subtract(f2_2018) gets the difference of f2_2018
    # from f2_2019. So the rows that are present in f2_2019
    # but not present in f2_2018 will be returned
    in_2019_not_2018 = f2_2019.subtract(f2_2018)
    print('This df contain the row differences between 2018 and 2019 years')
    in_2019_not_2018.show()

    # Create the missing columns in both df's using lit function.
    for column in [column for column in f2_2018.
                   columns if column not in in_2019_not_2018.columns]:
        in_2019_not_2018 = in_2019_not_2018.withColumn(column, lit(None))

    for column in [column for column in in_2019_not_2018.
                   columns if column not in f2_2018.columns]:
        f2_2018 = f2_2018.withColumn(column, lit(None))

    # Create a new df with 2018 plus all 2019 that doesn't exist in 2018.
    df_union3 = in_2019_not_2018.unionByName(f2_2018)
    df_union3 = df_union3.na.fill(0)
    print('This df contains all data to process the differences')
    df_union3.show(truncate=False)

    # Suming 2018 q's.
    df_union3 = df_union3.toPandas()
    suma2 = df_union3['q2018_Arris_Group'].sum() +\
        df_union3['q2018_Cisco_Systems_Inc'].sum() +\
        df_union3['q2018_Technicolor'].sum()

    # Apply the correct formula to get de p value for 2018.
    df_union3['p2018_ARRIS_Group'] = df_union3.\
        apply(lambda x: 1/suma2 if (x["q2018_Arris_Group"]) == 1 else 0,
              axis=1)

    df_union3['p2018_Cisco_Systems_Inc'] = df_union3.\
        apply(lambda x: 1/suma2 if (x["q2018_Cisco_Systems_Inc"]) == 1 else 0,
              axis=1)

    df_union3['p2018_Technicolor'] = df_union3.\
        apply(lambda x: 1 /
              suma2 if (x["q2018_Technicolor"]) == 1 else 0, axis=1)

    # Suming 2019 q's.
    suma3 = df_union3['q2019_Arris_Group'].sum() +\
        df_union3['q2019_Cisco_Systems_Inc'].sum() +\
        df_union3['q2019_Technicolor'].sum()

    # Filling all 2019.
    df_union3['q2019_Arris_Group'] = df_union3.\
        apply(lambda x: 1 if (x["Fabricante"]) == 'ARRIS Group, Inc.' else 0,
              axis=1)

    df_union3['q2019_Cisco_Systems_Inc'] = df_union3.\
        apply(lambda x: 1 if (x["Fabricante"]) == 'Cisco Systems, Inc' else 0,
              axis=1)

    df_union3['q2019_Technicolor'] = df_union3.\
        apply(lambda x: 1 if (x["Fabricante"]) ==
              'Technicolor CH USA Inc.' else 0, axis=1)

    # Apply the correct formula to get de p value for 2019.
    df_union3['p2019_ARRIS_Group'] = df_union3.\
        apply(lambda x: 1/suma3 if (x["q2019_Arris_Group"]) == 1 else 0,
              axis=1)

    df_union3['p2019_Cisco_Systems_Inc'] = df_union3.\
        apply(lambda x: 1/suma3 if (x["q2019_Cisco_Systems_Inc"]) == 1 else 0,
              axis=1)

    df_union3['p2019_Technicolor'] = df_union3.\
        apply(lambda x: 1 /
              suma3 if (x["q2019_Technicolor"]) == 1 else 0, axis=1)

    # I create now the new columns indicating the differences between q.
    df_union3['difq_ARRIS_Group'] = df_union3['q2019_Arris_Group']\
        - df_union3['q2018_Arris_Group']

    df_union3['difq_Cisco_Systems_Inc'] = df_union3['q2019_Cisco_Systems_Inc']\
        - df_union3['q2018_Cisco_Systems_Inc']

    df_union3['difq_Technicolor'] = df_union3['q2019_Technicolor']\
        - df_union3['q2018_Technicolor']

    # I create now the new columns indicating the differences between p.
    df_union3['difp_ARRIS_Group'] = df_union3['p2019_ARRIS_Group']\
        - df_union3['q2018_Arris_Group']

    df_union3['difp_Cisco_Systems_Inc'] = df_union3['p2019_Cisco_Systems_Inc']\
        - df_union3['q2018_Cisco_Systems_Inc']

    df_union3['difp_Technicolor'] = df_union3['p2019_Technicolor']\
        - df_union3['q2018_Technicolor']

    # Transforming to spark dataframe.
    df_union3 = sqlContext.createDataFrame(df_union3)
    print('Final df with all differences between 2018 and 2019:\n')
    df_union3.show(truncate=False, n=20)
    return df_union3

In [83]:
differences(sqlContext, f2_2018, f2_2019)

This df contain the row differences between 2018 and 2019 years
+--------+-------------+---------+--------------------+-----------+--------------+----------+--------------------+-----------------+-----------------------+-----------------+--------------------+-----------------------+--------------------+
|      id|Id_fabricante|Media_mac|          Fabricante|Zona_Censal|Manzana_Censal|    Comuna|            geometry|q2019_Arris_Group|q2019_Cisco_Systems_Inc|q2019_Technicolor|   p2019_ARRIS_Group|p2019_Cisco_Systems_Inc|   p2019_Technicolor|
+--------+-------------+---------+--------------------+-----------+--------------+----------+--------------------+-----------------+-----------------------+-----------------+--------------------+-----------------------+--------------------+
|11996341|       8C04FF|   BB4C42|Technicolor CH US...|          2|             2|LAS CONDES|POINT (-70.587140...|                0|                      0|                1|                 0.0|                  

DataFrame[id: string, Id_fabricante: string, Media_mac: string, Fabricante: string, Zona_Censal: bigint, Manzana_Censal: bigint, Comuna: string, geometry: string, q2019_Arris_Group: bigint, q2019_Cisco_Systems_Inc: bigint, q2019_Technicolor: bigint, p2019_ARRIS_Group: double, p2019_Cisco_Systems_Inc: double, p2019_Technicolor: double, q2018_Arris_Group: bigint, q2018_Cisco_Systems_Inc: bigint, q2018_Technicolor: bigint, p2018_ARRIS_Group: double, p2018_Cisco_Systems_Inc: double, p2018_Technicolor: double, difq_ARRIS_Group: bigint, difq_Cisco_Systems_Inc: bigint, difq_Technicolor: bigint, difp_ARRIS_Group: double, difp_Cisco_Systems_Inc: double, difp_Technicolor: double]

In [84]:
# Creating at least 10 new features.
# UDF for converting column type from vector to double type
df_to_scale = differences(sqlContext, f2_2018, f2_2019)
unlist = udf(lambda x: round(float(list(x)[0]), 4), DoubleType())

# Iterating over columns to be scaled
for i in ['q2019_Arris_Group', 'q2019_Cisco_Systems_Inc',
          'q2019_Technicolor', 'p2019_ARRIS_Group',
          'p2019_Cisco_Systems_Inc', 'p2019_Technicolor',
          'q2018_Arris_Group', 'q2018_Cisco_Systems_Inc',
          'q2018_Technicolor', 'p2018_ARRIS_Group',
          'p2018_Cisco_Systems_Inc', 'p2018_Technicolor',
          'difq_ARRIS_Group', 'difq_Cisco_Systems_Inc',
          'difq_Technicolor', 'difp_ARRIS_Group',
          'difp_Cisco_Systems_Inc', 'difp_Technicolor']:

    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i], outputCol=i+'_Vect')

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+'_Vect', outputCol=i+'_Scaled')

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df_to_scale = pipeline.fit(df_to_scale).transform(df_to_scale)\
        .withColumn(i+'_Scaled', unlist(i+'_Scaled')).drop(i+'_Vect')

print('Final df Sprint3 after Scaling :')
df_to_scale.show(n=20)

# Converting categorical data to string indexed format.
indexer = StringIndexer(inputCols=('Comuna', 'geometry', 'Id_fabricante'),
                        outputCols=('ComunaIndex', 'geoIndex', 'FabIndex'))
unlist = udf(lambda x: round(float(list(x)[0]), 4), DoubleType())
df_indexed = indexer.fit(df_to_scale).transform(df_to_scale)

# Iterating over columns to be scaled
for i in ['ComunaIndex', 'geoIndex', 'FabIndex', 'Zona_Censal',
          'Manzana_Censal']:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i], outputCol=i+'_Vect')

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+'_Vect', outputCol=i+'_Scaled')

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df_indexed = pipeline.fit(df_indexed).transform(df_indexed)\
        .withColumn(i+'_Scaled', unlist(i+'_Scaled')).drop(i+'_Vect')
print('Df  after Indexing strings and all data scaled :')
df_indexed.show()
df_indexed.printSchema()

This df contain the row differences between 2018 and 2019 years
+--------+-------------+---------+--------------------+-----------+--------------+----------+--------------------+-----------------+-----------------------+-----------------+--------------------+-----------------------+--------------------+
|      id|Id_fabricante|Media_mac|          Fabricante|Zona_Censal|Manzana_Censal|    Comuna|            geometry|q2019_Arris_Group|q2019_Cisco_Systems_Inc|q2019_Technicolor|   p2019_ARRIS_Group|p2019_Cisco_Systems_Inc|   p2019_Technicolor|
+--------+-------------+---------+--------------------+-----------+--------------+----------+--------------------+-----------------+-----------------------+-----------------+--------------------+-----------------------+--------------------+
|11996341|       8C04FF|   BB4C42|Technicolor CH US...|          2|             2|LAS CONDES|POINT (-70.587140...|                0|                      0|                1|                 0.0|                  

In [85]:
# Sort the DF to work in predictions.
final_df = df_indexed.select('q2019_Arris_Group_Scaled',
                             'q2019_Cisco_Systems_Inc_Scaled',
                             'q2019_Technicolor_Scaled',
                             'p2019_ARRIS_Group_Scaled',
                             'p2019_Cisco_Systems_Inc_Scaled',
                             'p2019_Technicolor_Scaled',
                             'q2018_Arris_Group_Scaled',
                             'q2018_Cisco_Systems_Inc_Scaled',
                             'q2018_Technicolor_Scaled',
                             'p2018_ARRIS_Group_Scaled',
                             'p2018_Cisco_Systems_Inc_Scaled',
                             'p2018_Technicolor_Scaled',
                             'difq_ARRIS_Group_Scaled',
                             'difq_Cisco_Systems_Inc_Scaled',
                             'difq_Technicolor_Scaled',
                             'difp_ARRIS_Group_Scaled',
                             'difp_Cisco_Systems_Inc_Scaled',
                             'difp_Technicolor_Scaled',
                             'ComunaIndex_Scaled',
                             'geoIndex_Scaled',
                             'FabIndex_Scaled',
                             'Zona_Censal_Scaled',
                             'Manzana_Censal_Scaled')

In [89]:
print('Final tableu to use with LightGBM')
final_df.show()
final_df.printSchema()

Final tableu to use with LightGBM
+------------------------+------------------------------+------------------------+------------------------+------------------------------+------------------------+------------------------+------------------------------+------------------------+------------------------+------------------------------+------------------------+-----------------------+-----------------------------+-----------------------+-----------------------+-----------------------------+-----------------------+------------------+---------------+---------------+------------------+---------------------+
|q2019_Arris_Group_Scaled|q2019_Cisco_Systems_Inc_Scaled|q2019_Technicolor_Scaled|p2019_ARRIS_Group_Scaled|p2019_Cisco_Systems_Inc_Scaled|p2019_Technicolor_Scaled|q2018_Arris_Group_Scaled|q2018_Cisco_Systems_Inc_Scaled|q2018_Technicolor_Scaled|p2018_ARRIS_Group_Scaled|p2018_Cisco_Systems_Inc_Scaled|p2018_Technicolor_Scaled|difq_ARRIS_Group_Scaled|difq_Cisco_Systems_Inc_Scaled|difq_Technico

In [112]:
def LightGBM(final_df):
    """Applying LightGBM.

    Args:
        final_df (Spark Dataframe): Final Tableu.
    """
    final_df = final_df.toPandas()

    X = final_df[['q2019_Arris_Group_Scaled',
                  'q2019_Cisco_Systems_Inc_Scaled',
                  'q2019_Technicolor_Scaled',
                  'p2019_ARRIS_Group_Scaled',
                  'p2019_Cisco_Systems_Inc_Scaled',
                  'p2019_Technicolor_Scaled',
                  'q2018_Arris_Group_Scaled',
                  'q2018_Cisco_Systems_Inc_Scaled',
                  'q2018_Technicolor_Scaled',
                  'p2018_ARRIS_Group_Scaled',
                  'p2018_Cisco_Systems_Inc_Scaled',
                  'p2018_Technicolor_Scaled',
                  'difq_Technicolor_Scaled',
                  'difp_ARRIS_Group_Scaled',
                  'difp_Cisco_Systems_Inc_Scaled',
                  'difp_Technicolor_Scaled',
                  'difq_ARRIS_Group_Scaled',
                  'ComunaIndex_Scaled',
                  'geoIndex_Scaled',
                  'FabIndex_Scaled',
                  'Zona_Censal_Scaled',
                  'Manzana_Censal_Scaled']]

    y = final_df['difq_Cisco_Systems_Inc_Scaled']

    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.2,
                                                        random_state=0)

    # Build the lightgbm model
    clf = lgb.LGBMClassifier()
    clf.fit(X_train, y_train)

    # Predict the results
    y_pred = clf.predict(X_test)
    Sum = sum(y_pred)
    print('Wifi hotspot difference for next Year', Sum)
    print('The predictions are', y_pred)

    # View accuracy
    print('LightGBM Model accuracy score: {0:0.4f}'.format(accuracy_score
                                                           (y_test, y_pred)))

In [113]:
LightGBM(final_df)

Wifi hotspot difference for next Year 482.0
The predictions are [0. 1. 0. ... 0. 0. 0.]
LightGBM Model accuracy score: 1.0000


In [103]:
final_df.groupBy().sum().collect()

[Row(sum(q2019_Arris_Group_Scaled)=4128.0, sum(q2019_Cisco_Systems_Inc_Scaled)=3123.0, sum(q2019_Technicolor_Scaled)=2106.0, sum(p2019_ARRIS_Group_Scaled)=4128.0, sum(p2019_Cisco_Systems_Inc_Scaled)=3123.0, sum(p2019_Technicolor_Scaled)=2106.0, sum(q2018_Arris_Group_Scaled)=2884.0, sum(q2018_Cisco_Systems_Inc_Scaled)=2134.0, sum(q2018_Technicolor_Scaled)=1440.0, sum(p2018_ARRIS_Group_Scaled)=2884.0, sum(p2018_Cisco_Systems_Inc_Scaled)=2134.0, sum(p2018_Technicolor_Scaled)=1440.0, sum(difq_ARRIS_Group_Scaled)=1244.0, sum(difq_Cisco_Systems_Inc_Scaled)=989.0, sum(difq_Technicolor_Scaled)=666.0, sum(difp_ARRIS_Group_Scaled)=17707.059600003897, sum(difp_Cisco_Systems_Inc_Scaled)=18456.75810000412, sum(difp_Technicolor_Scaled)=19150.45300000432, sum(ComunaIndex_Scaled)=651.9309000000214, sum(geoIndex_Scaled)=7156.859800000037, sum(FabIndex_Scaled)=2528.671599999958, sum(Zona_Censal_Scaled)=5919.800000000047, sum(Manzana_Censal_Scaled)=1018.5304999999861)]