# Tarea 8 - Luigi

_175904 - Jorge III Altamirano Astorga_


In [35]:
import os
import csv
import json
import luigi
import time
import random
import requests
from luigi.contrib.postgres import CopyToTable, PostgresTarget, PostgresQuery

In [67]:
class DeleteTableProducto(PostgresQuery):
    producto = luigi.Parameter()
    host = os.environ.get('DB_HOST', '172.17.0.6:5432')
    database = os.environ.get('DB_DATABASE', 'QQP')
    user = os.environ.get('DB_USER', 'QQP')
    password = os.environ.get('DB_PASSWORD', 'q1q2p')
    port = os.environ.get('DB_PORT', 5432)
    table = os.environ.get('DB_TABLE','PRODUCTO')
    update_id = str(int(round(time.time() * 1000) * random.random()))

    @property
    def query(self):
        return "DELETE FROM PRODUCTO;"


class DownloadProduct(luigi.Task):
    producto = luigi.Parameter()

    def requires(self):
        return DeleteTableProducto(self.producto)

    def run(self):
        page = 1
        must_continue = True
        list_product = []

        while must_continue:
            print("Peticion al API pagina: ", str(page))
            self.set_status_message("Peticion al API QQP, producto: {} pagina: {}" \
                                    .format(self.producto, str(page)))
            params_response = {'producto': self.producto, 'page': str(page), 'pageSize': str(1000)}
            if(len(self.producto) < 1): params_response = {'page': str(page), 'pageSize': str(1000)}
            response = requests.get('https://api.datos.gob.mx/v1/profeco.precios', 
                                    params=params_response)
            print("Respuesta del servidor", response.status_code)
            if response.status_code == 200:
                json_response = response.json().get('results', [])
                must_continue = len(json_response) > 0

                if must_continue:
                    list_product.extend(json_response)
                    page += 1

        if len(list_product) > 0:
            with self.output().open('w') as json_file:
                json.dump(list_product, json_file)

    def output(self):
        return luigi.LocalTarget('/tmp/qqp/{}/data.json'.format(self.producto))


class ConvertJSONToCSV(luigi.Task):
    producto = luigi.Parameter()

    def requires(self):
        return DownloadProduct(self.producto)

    def run(self):
        with self.input().open('r') as json_file:
            json_product = json.load(json_file)

        print(len(json_product))
        headers = json_product[0].keys()

        with open('/tmp/qqp/{0}/headers.csv'.format(self.producto), 'w+') as header_file:
            json.dump(list(headers), header_file)

        with self.output().open('w') as csv_file:
            writer = csv.writer(csv_file, delimiter='|', quotechar='"')

            for product in json_product:
                writer.writerow(list(product.values()))

    def output(self):
        return luigi.LocalTarget('/tmp/qqp/{0}/data.csv'.format(self.producto))


class InsertDataInDataBase(CopyToTable):
    producto = luigi.Parameter()
    host = os.environ.get('DB_HOST', '172.17.0.6:5432')
    database = os.environ.get('DB_DATABASE', 'QQP')
    user = os.environ.get('DB_USER', 'QQP')
    password = os.environ.get('DB_PASSWORD', 'q1q2p')
    port = os.environ.get('DB_PORT', 5432)
    table = os.environ.get('DB_TABLE','PRODUCTO')
    update_id = str(int(round(time.time() * 1000) * random.random()))
    column_separator = "|"

    @property
    def columns(self):
        with open('/tmp/qqp/{0}/headers.csv'.format(self.producto), 'r') as header_file:
            return json.load(header_file)

    def requires(self):
        return ConvertJSONToCSV(self.producto)


class DropAggTableIfExists(PostgresQuery):
    producto = luigi.Parameter()
    host = os.environ.get('DB_HOST', '172.17.0.6:5432')
    database = os.environ.get('DB_DATABASE', 'QQP')
    user = os.environ.get('DB_USER', 'QQP')
    password = os.environ.get('DB_PASSWORD', 'q1q2p')
    port = os.environ.get('DB_PORT', 5432)
    table = os.environ.get('DB_TABLE','PRODUCTO')
    update_id = str(int(round(time.time() * 1000) * random.random()))

    @property
    def query(self):
        return "DROP TABLE IF EXISTS agg_{0};".format(self.producto.lower().replace(' ', '_'))

    def requires(self):
        return InsertDataInDataBase(self.producto)


class AggretateByState(PostgresQuery):
    producto = luigi.Parameter()
    host = os.environ.get('DB_HOST', '172.17.0.6:5432')
    database = os.environ.get('DB_DATABASE', 'QQP')
    user = os.environ.get('DB_USER', 'QQP')
    password = os.environ.get('DB_PASSWORD', 'q1q2p')
    port = os.environ.get('DB_PORT', 5432)
    table = os.environ.get('DB_TABLE','PRODUCTO')
    update_id = str(int(round(time.time() * 1000) * random.random() ))

    @property
    def query(self):
        return "SELECT AVG(precio), cadenaComercial INTO agg_{0} FROM PRODUCTO GROUP BY cadenaComercial;" \
            .format(self.producto.lower().replace(' ', '_'))

    def requires(self):
        return DropAggTableIfExists(self.producto)


class StartPipeline(luigi.Task):
    producto = luigi.Parameter()

    def requires(self):
        return AggretateByState(self.producto)
    
luigi.build([StartPipeline(producto="")])

DEBUG: Checking if StartPipeline(producto=) is complete
  is_complete = task.complete()
DEBUG: Checking if AggretateByState(producto=) is complete
INFO: Informed scheduler that task   StartPipeline__7a52092eb8   has status   PENDING
DEBUG: Checking if DropAggTableIfExists(producto=) is complete
INFO: Informed scheduler that task   AggretateByState__7a52092eb8   has status   PENDING
DEBUG: Checking if InsertDataInDataBase(producto=) is complete
INFO: Informed scheduler that task   DropAggTableIfExists__7a52092eb8   has status   PENDING
DEBUG: Checking if ConvertJSONToCSV(producto=) is complete
INFO: Informed scheduler that task   InsertDataInDataBase__7a52092eb8   has status   PENDING
INFO: Informed scheduler that task   ConvertJSONToCSV__7a52092eb8   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 24606] Worker Worker(salt=561148329, workers=1, host=jupyter.corp.penoles.mx, userna

False

In [119]:
import boto3

emr = boto3.client("emr")
clusters = emr.list_clusters()["Clusters"]

for cluster in clusters:
    status = cluster["Status"]
    instances = emr.list_instances(ClusterId = cluster["Id"])["Instances"]
    print("Cluster ID=%s, status=%s"%(cluster["Id"], status["State"]))
    for instance in instances:
        print("    Instance ID=%s, status=%s"%(instance["Id"], instance["Status"]["State"]))

Cluster ID=j-2OJ18JX0F6K2S, status=TERMINATED
    Instance ID=ci-1LHWIL1RFFNCE, status=TERMINATED
    Instance ID=ci-90HEN7609QQ9, status=TERMINATED
    Instance ID=ci-2L049BXYMZV6G, status=TERMINATED
Cluster ID=j-3GO9154OA62RJ, status=TERMINATED
    Instance ID=ci-2REJ34JANE3MY, status=TERMINATED
    Instance ID=ci-2TXXYIJK1O5PY, status=TERMINATED
    Instance ID=ci-2ESSAVS5YBV6T, status=TERMINATED
Cluster ID=j-1V0489WY8SV6, status=TERMINATED
    Instance ID=ci-NGBUGUJ9EN44, status=TERMINATED
    Instance ID=ci-BVR64HF7HHNE, status=TERMINATED
    Instance ID=ci-2MM0THW8YP24H, status=TERMINATED
Cluster ID=j-6UU7XES6A0L0, status=TERMINATED
    Instance ID=ci-3CF2KSV0V29JJ, status=TERMINATED
    Instance ID=ci-2D998SEN7L5GI, status=TERMINATED
    Instance ID=ci-3V951XDQBO319, status=TERMINATED
Cluster ID=j-1T0U5VYWD2ZWR, status=TERMINATED
    Instance ID=ci-1ICYNBTH1ZILR, status=TERMINATED
    Instance ID=ci-2EBF7Y7EWWJR6, status=TERMINATED
    Instance ID=ci-2I3RRMECH4KRI, status=TERMIN

In [156]:
%%writefile run.sh
git clone git://github.com/yyuu/pyenv.git ~/.pyenv; 
export PYENV=~/.pyenv; 
export PATH="$PYENV/bin":"$PATH"; 
eval "$(pyenv init -)";
# al parecer versiones 3.5.4 encuentran este bug
# https://github.com/RobotWebTools/rosbridge_suite/issues/154
pyenv install 3.4.7 
pyenv shell 3.4.7 && \ 
python3 -m pip install -r requirements.txt && \
luigid --background

Overwriting run.sh


In [157]:
%%bash
pwd
chmod a+rx run.sh
./run.sh

/home/jaa6766/Documents/jorge3a/itam/metodos_gran_escala/alumnos/jorge_altamirano/tarea_8


fatal: destination path '/home/jaa6766/.pyenv' already exists and is not an empty directory.
pyenv: /home/jaa6766/.pyenv/versions/3.4.7 already exists
./run.sh: line 8:  : command not found
You are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Traceback (most recent call last):
  File "/home/jaa6766/.pyenv/versions/3.4.7/bin/luigid", line 11, in <module>
    load_entry_point('luigi==2.7.5', 'console_scripts', 'luigid')()
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/luigi/cmdline.py", line 15, in luigid
    import luigi.server
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/luigi/server.py", line 48, in <module>
    import tornado.httpserver
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/tornado/httpserver.py", line 34, in <module>
    from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters
  F

In [164]:
%%bash
export PYENV=~/.pyenv; 
export PATH="$PYENV/bin":"$PATH"; 
eval "$(pyenv init -)";
pyenv shell 3.4.7
luigid --background 



Traceback (most recent call last):
  File "/home/jaa6766/.pyenv/versions/3.4.7/bin/luigid", line 11, in <module>
    load_entry_point('luigi==2.7.5', 'console_scripts', 'luigid')()
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/luigi/cmdline.py", line 15, in luigid
    import luigi.server
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/luigi/server.py", line 48, in <module>
    import tornado.httpserver
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/tornado/httpserver.py", line 34, in <module>
    from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/tornado/http1connection.py", line 28, in <module>
    from tornado import gen
  File "/home/jaa6766/.pyenv/versions/3.4.7/lib/python3.4/site-packages/tornado/gen.py", line 89, in <module>
    from tornado.ioloop import IOLoop
  File "/home/jaa6766/.pyenv/versions/3.

In [66]:
#importo librerías
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import *
from pyspark.sql import DataFrameStatFunctions, DataFrame
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
import re as re
import time

In [3]:
#arranque de Spark
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.driver.cores", 4)
conf.set("spark.driver.memoryOverhead", 0.9)
conf.set("spark.executor.memory", "32g")
conf.set("spark.executor.cores", 12)
conf.set("spark.jars", "/home/jaa6766")
sc = SparkContext(master = "local[14]", sparkHome="/usr/local/spark/", 
                  appName="tarea-mge-8", conf=conf)
spark = SQLContext(sc)

In [87]:
%%time
data = spark.read.csv("hdfs://172.17.0.2:9000/data/profeco/data.csv", 
                      schema = StructType() \
                        .add("producto", StringType(), False) \
                        .add("presentacion", StringType(), True) \
                        .add("marca", StringType(), True) \
                        .add("categoria", StringType(), True) \
                        .add("catalogo", StringType(), True) \
                        .add("precio", DecimalType(precision=16, scale=4), True) \
                        .add("fechaRegistro", TimestampType(), True) \
                        .add("cadenaComercial", StringType(), True) \
                        .add("giro", StringType(), True) \
                        .add("nombreComercial", StringType(), True) \
                        .add("direccion", StringType(), True) \
                        .add("estado", StringType(), True) \
                        .add("municipio", StringType(), True) \
                        .add("latitud", StringType(), True) \
                        .add("longitud", StringType(), True),
                      inferSchema=False,
                      escape='"',
                      quote='"',
                      timestampFormat="yyyy-MM-dd hh:mm:ss",
                      header=True)
data.write.parquet("hdfs://172.17.0.2:9000/data/profeco/data.parquet", mode="overwrite")
data = spark.read.parquet("hdfs://172.17.0.2:9000/data/profeco/data.parquet")
data.show(2)
data.printSchema()

+--------------------+--------------------+-----+--------------------+------------+--------+-------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------------+---------+----------+
|            producto|        presentacion|marca|           categoria|    catalogo|  precio|      fechaRegistro|     cadenaComercial|                giro|     nombreComercial|           direccion|          estado|          municipio|  latitud|  longitud|
+--------------------+--------------------+-----+--------------------+------------+--------+-------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------------+---------+----------+
|             GAVINDO|CAJA CON 30 TABLETAS|  S/M|        MEDICAMENTOS|MEDICAMENTOS|644.7000|2016-02-11 14:08:13|FARMACIA GUADALAJARA|TIENDA DE AUTOSER...|FARMACIA GUADALAJ...|HIDALGO 42, COL. ...|          MÉXICO|       TLALNEPANTLA|  

In [7]:
%%time
data.approxQuantile("precio", [0.1, 0.5, 0.85], 0.1)

CPU times: user 15.1 ms, sys: 8.36 ms, total: 23.5 ms
Wall time: 7.32 s


[0.1, 44.2, 419.0]

In [8]:
%%time 
data.rdd.countApprox(timeout=1, confidence=0.9)


CPU times: user 31.7 ms, sys: 15.2 ms, total: 46.9 ms
Wall time: 4min 5s


62530715

In [139]:
def summary_j3a(col):
    cnt1 = pd.DataFrame([{"count": data.count()}]).transpose()
    min1 = data.select(min(col).alias("min")).toPandas().transpose()
    max1 = data.select(max(col).alias("max")).toPandas().transpose()
    avg1 = data.select(mean(col).alias("avg")).toPandas().transpose()
    std1 = data.select(stddev(col).alias("stddev")).toPandas().transpose()
    probs = [0.25, 0.5, 0.75]
    qnt1 = pd.DataFrame(  \
        data.approxQuantile(col, probabilities=probs, relativeError=0.05)
    )
    qnt1 = qnt1.rename_axis({0: "25%", 1: "50%", 2: "75%"}, axis=0)
    complete = cnt1.append(min1).append(qnt1).append(max1).append(avg1).append(std1)
    complete = complete.rename(index=str, columns={0: col})
    complete[col] = complete.apply(lambda x: "{:,}".format(x[col]), axis=1)
    return complete
def summary_string_j3a(col):
    cnt1 = pd.DataFrame([{"count": data.count()}]).transpose()
    min1 = data.select(min(col).alias("min")).toPandas().transpose()
    max1 = data.select(max(col).alias("max")).toPandas().transpose()
    complete = cnt1.append(min1).append(max1)
    complete = complete.rename(index=str, columns={0: col})
    return complete
%time summary_j3a("precio")

CPU times: user 59.6 ms, sys: 22.6 ms, total: 82.2 ms
Wall time: 11.4 s


Unnamed: 0,precio
count,62530715.0
min,0.1
25%,20.0
50%,40.9
75%,268.2
max,299999.0
avg,516.56992236
stddev,1998.620808903092


In [144]:
def summary_string_j3a(col):
    cnt1 = pd.DataFrame([{"count": data.count()}]).transpose()
    min1 = data.select(min(col).alias("min")).toPandas().transpose()
    max1 = data.select(max(col).alias("max")).toPandas().transpose()
    complete = cnt1.append(min1).append(max1)
    complete = complete.rename(index=str, columns={0: col})
    return complete
%time summary_string_j3a("producto")

CPU times: user 29.8 ms, sys: 10.5 ms, total: 40.3 ms
Wall time: 5.63 s


Unnamed: 0,producto
count,62530715
min,A.S.COR
max,ZYPREXA


In [9]:
%%time
data.select(approxCountDistinct("producto", rsd=0.01)).show()
print(data.count())

+-------------------------------+
|approx_count_distinct(producto)|
+-------------------------------+
|                           1100|
+-------------------------------+

62530715
CPU times: user 6 ms, sys: 2.85 ms, total: 8.85 ms
Wall time: 6.37 s


In [77]:
data.select(approxCountDistinct("estado", rsd=0.01)).show()

+-----------------------------+
|approx_count_distinct(estado)|
+-----------------------------+
|                           32|
+-----------------------------+



In [74]:
%%time
data.select("estado").distinct().orderBy("estado").show(35)

+--------------------+
|              estado|
+--------------------+
|                null|
|      AGUASCALIENTES|
|     BAJA CALIFORNIA|
| BAJA CALIFORNIA SUR|
|            CAMPECHE|
|             CHIAPAS|
|           CHIHUAHUA|
|COAHUILA DE ZARAGOZA|
|              COLIMA|
|    DISTRITO FEDERAL|
|             DURANGO|
|          GUANAJUATO|
|            GUERRERO|
|             HIDALGO|
|             JALISCO|
| MICHOACÁN DE OCAMPO|
|             MORELOS|
|              MÉXICO|
|             NAYARIT|
|          NUEVO LEÓN|
|              OAXACA|
|              PUEBLA|
|           QUERÉTARO|
|        QUINTANA ROO|
|     SAN LUIS POTOSÍ|
|             SINALOA|
|              SONORA|
|             TABASCO|
|          TAMAULIPAS|
|            TLAXCALA|
|VERACRUZ DE IGNAC...|
|             YUCATÁN|
|           ZACATECAS|
+--------------------+

CPU times: user 6.31 ms, sys: 4.99 ms, total: 11.3 ms
Wall time: 2.02 s


In [73]:
data.filter("estado is null").show()

+--------+------------+-----+---------+--------+------+-------------+---------------+----+---------------+---------+------+---------+-------+--------+
|producto|presentacion|marca|categoria|catalogo|precio|fechaRegistro|cadenaComercial|giro|nombreComercial|direccion|estado|municipio|latitud|longitud|
+--------+------------+-----+---------+--------+------+-------------+---------------+----+---------------+---------+------+---------+-------+--------+
|    null|        null| null|     null|    null|  null|         null|           null|null|           null|     null|  null|     null|   null|    null|
|    null|        null| null|     null|    null|  null|         null|           null|null|           null|     null|  null|     null|   null|    null|
|    null|        null| null|     null|    null|  null|         null|           null|null|           null|     null|  null|     null|   null|    null|
|    null|        null| null|     null|    null|  null|         null|           null|null|    

In [69]:
data.filter("direccion like '%ESQ. SUR 125%' ").show()

+------------------+------------+-----+---------+--------+------+--------------------+--------------------+--------------------+---------------+--------------------+----------------+--------------------+---------+----------+
|          producto|presentacion|marca|categoria|catalogo|precio|       fechaRegistro|     cadenaComercial|                giro|nombreComercial|           direccion|          estado|           municipio|  latitud|  longitud|
+------------------+------------+-----+---------+--------+------+--------------------+--------------------+--------------------+---------------+--------------------+----------------+--------------------+---------+----------+
|PAN BLANCO BOLILLO|       PIEZA|  S/M|      PAN| BASICOS|   1.5|2014-12-09 00:00:...|PANADERIAS TRADIC...|PANADERIA Y PASTE...|  PAN BARCELONA|ORIENTE 100 ""A""...|DISTRITO FEDERAL|IZTACALCO        ...|19.395734|-99.101886|
|PAN BLANCO BOLILLO|       PIEZA|  S/M|      PAN| BASICOS|   1.5|2012-10-23 00:00:...|PANADERIAS TRA

In [23]:
data.select(col("fechaRegistro")).show(truncate=False)

+-------------------+
|fechaRegistro      |
+-------------------+
|2016-02-11 14:08:13|
|2016-02-11 14:08:14|
|2016-02-11 14:08:14|
|2016-02-11 14:08:14|
|2016-02-11 14:08:15|
|2016-02-11 14:08:16|
|2016-02-11 14:08:17|
|2016-02-11 14:08:19|
|2016-02-11 14:08:19|
|2016-02-11 14:08:19|
|2016-02-11 14:08:20|
|2016-02-11 14:08:20|
|2016-02-11 14:08:22|
|2016-02-11 14:08:22|
|2016-02-11 14:08:24|
|2016-02-11 14:08:25|
|2016-02-11 14:08:27|
|2016-02-11 14:08:27|
|2016-02-11 14:08:27|
|2016-02-11 14:08:28|
+-------------------+
only showing top 20 rows



## Bibliografía

* <https://github.com/spotify/luigi/>
* <https://github.com/spotify/luigi/issues/1116>
* <https://github.com/spotify/luigi/blob/master/examples/pyspark_wc.py>
* <http://bionics.it/posts/luigi-tutorial>
* <https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval>
* <https://stackoverflow.com/questions/40218393/how-to-configure-luigi-task-retry-correctly>
* <https://stackoverflow.com/questions/23302184/running-pyspark-script-on-emr>
* <https://stackoverflow.com/questions/9942594/unicodeencodeerror-ascii-codec-cant-encode-character-u-xa0-in-position-20>
* <https://stackoverflow.com/a/39293287>

In [78]:
sc.stop()