# Example

This is an Example notebook.

Here we demonstrate the basics of PySpark. Also, we will follow an ETL workflow.

The data will be loaded from the local filesystem and written on Elasticsearch. 
After that, the processed data will be available for visualization on Kibana.

For more help, check the following links:
- https://docs.azuredatabricks.net/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("HelloWorldApp").getOrCreate()

### Extract

The data we have don't have to be extracted. The only thing we need to do is to unzip it.


In [2]:
%%sh
unzip /data/kpi.zip -d /tmp
ls /tmp

Archive:  /data/kpi.zip
replace BOT_71721a50-15e8-11e9-bea4-63a645f6c589_BR_API_20190204T000000Z.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: error: invalid response [l]
replace BOT_71721a50-15e8-11e9-bea4-63a645f6c589_BR_API_20190204T000000Z.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

unzip: can't read standard input


## Exploration

In [4]:
df = spark.read.option("delimiter", "|").option("header", "true").csv('/tmp/BOT_*MESSAGE*.txt')

In [5]:
print(df.count(), len(df.columns))

(344, 27)


In [6]:
df.columns

['USER_ID',
 'MSG_DT',
 'MSG_ID',
 'ACTION_CD',
 'AURA_ID',
 'PHONE_ID',
 'CHANNEL_CD',
 'SUBSCRIPTION_CD',
 'DOMAIN_CD',
 'CATEGORY_CD',
 'COUNTRY_CD',
 'CORR_ID',
 'IS_CACHED',
 'STATUS_CD',
 'REASON',
 'VERSION_ID',
 'LANG_CD',
 'TZ_CD',
 'DURATION_NU',
 'MESSAGE',
 'DIALOG_ID',
 'CONVERSATION_ID',
 'WIN_RECOGNIZER_CD',
 'WIN_RECOGNIZER_SCORE_NU',
 'INTENT',
 'ENTITIES',
 'MODALITY_CD']

In [7]:
df.first()

Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:00.527Z', MSG_ID=u'e0426ac7-5576-4995-827a-400d4e01ccb7', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', PHONE_ID=u'+5511995504724', CHANNEL_CD=u'facebook', SUBSCRIPTION_CD=u'postpaid', DOMAIN_CD=None, CATEGORY_CD=None, COUNTRY_CD=u'br', CORR_ID=u'06d6a0b7-74f4-5d5c-9655-370e7e7f288a', IS_CACHED=u'false', STATUS_CD=u'200', REASON=None, VERSION_ID=u'2.3.1', LANG_CD=u'pt-br', TZ_CD=u'America/Sao_Paulo', DURATION_NU=u'6087', MESSAGE=u'Seu plano de dados \xe9 ilimitado', DIALOG_ID=u'services:service-usage', CONVERSATION_ID=u'2058564260851298-450287408662664', WIN_RECOGNIZER_CD=None, WIN_RECOGNIZER_SCORE_NU=None, INTENT=u'tef.int.pt_BR.fb.usage.check', ENTITIES=u'de dados,dados', MODALITY_CD=u'text')

In [8]:
receive_messages = df[df['ACTION_CD']=='receive']
send_messages = df[df['ACTION_CD']=='send']

In [9]:
df.select("USER_ID", "MSG_DT", "ACTION_CD", "AURA_ID", "DURATION_NU").take(10)

[Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:00.527Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=u'6087'),
 Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:00.530Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=u'6154'),
 Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:00.531Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=u'6156'),
 Row(USER_ID=None, MSG_DT=u'2019-02-08T13:00:16.738Z', ACTION_CD=u'receive', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=None),
 Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:18.565Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=u'1827'),
 Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:21.958Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_NU=u'5220'),
 Row(USER_ID=u'11995504724', MSG_DT=u'2019-02-08T13:00:21.959Z', ACTION_CD=u'send', AURA_ID=u'facebook_-_2058564260851298', DURATION_N

### Transform

On the transform step of our ETL, we will add a new column to the messages dataframe.

This new column will be the sum of the *DURATION_NU* column, which represents the time took for the bot to respond the given request.

In [10]:
from pyspark.sql.functions import col, udf

Lets add two new columns with parsed date to the dataframe.

In [11]:
df = df.withColumn('MSG_DATE', col("MSG_DT").cast("date").alias("dt"))
df = df.withColumn('MSG_TS', col("MSG_DT").cast("timestamp").alias("ts"))

In [12]:
df[['MSG_DT', 'MSG_TS']].take(10)

[Row(MSG_DT=u'2019-02-08T13:00:00.527Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 0, 527000)),
 Row(MSG_DT=u'2019-02-08T13:00:00.530Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 0, 530000)),
 Row(MSG_DT=u'2019-02-08T13:00:00.531Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 0, 531000)),
 Row(MSG_DT=u'2019-02-08T13:00:16.738Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 16, 738000)),
 Row(MSG_DT=u'2019-02-08T13:00:18.565Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 18, 565000)),
 Row(MSG_DT=u'2019-02-08T13:00:21.958Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 21, 958000)),
 Row(MSG_DT=u'2019-02-08T13:00:21.959Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 21, 959000)),
 Row(MSG_DT=u'2019-02-08T13:00:21.961Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 21, 961000)),
 Row(MSG_DT=u'2019-02-08T13:00:21.960Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 21, 960000)),
 Row(MSG_DT=u'2019-02-08T13:00:41.421Z', MSG_TS=datetime.datetime(2019, 2, 8, 13, 0, 41, 421000))]

**Add sum of DURATION_NU by correlation id**

In [13]:
df = df.withColumn('DURATION', col("DURATION_NU").cast("double").alias("duration"))

In [14]:
df.select('DURATION').na.fill(0).collect()

[Row(DURATION=6087.0),
 Row(DURATION=6154.0),
 Row(DURATION=6156.0),
 Row(DURATION=0.0),
 Row(DURATION=1827.0),
 Row(DURATION=5220.0),
 Row(DURATION=5222.0),
 Row(DURATION=5224.0),
 Row(DURATION=5223.0),
 Row(DURATION=0.0),
 Row(DURATION=4406.0),
 Row(DURATION=4408.0),
 Row(DURATION=4409.0),
 Row(DURATION=0.0),
 Row(DURATION=2994.0),
 Row(DURATION=0.0),
 Row(DURATION=6577.0),
 Row(DURATION=0.0),
 Row(DURATION=1839.0),
 Row(DURATION=0.0),
 Row(DURATION=15653.0),
 Row(DURATION=0.0),
 Row(DURATION=517.0),
 Row(DURATION=0.0),
 Row(DURATION=1031.0),
 Row(DURATION=0.0),
 Row(DURATION=553.0),
 Row(DURATION=0.0),
 Row(DURATION=377.0),
 Row(DURATION=0.0),
 Row(DURATION=1330.0),
 Row(DURATION=0.0),
 Row(DURATION=3421.0),
 Row(DURATION=0.0),
 Row(DURATION=1373.0),
 Row(DURATION=0.0),
 Row(DURATION=855.0),
 Row(DURATION=0.0),
 Row(DURATION=1684.0),
 Row(DURATION=0.0),
 Row(DURATION=1995.0),
 Row(DURATION=0.0),
 Row(DURATION=492.0),
 Row(DURATION=0.0),
 Row(DURATION=935.0),
 Row(DURATION=0.0),
 Row

In [15]:
df.where(col('DURATION_NU') > 1000).count()

116

In [16]:
df.count()

344

In [17]:
from pyspark.sql import functions as F


In [18]:
# receive messages dont have duration
df_avg_dur = df.where(col('ACTION_CD') == 'send').groupBy('CORR_ID').agg(F.avg('DURATION'))

In [19]:
df_avg_dur.take(5)

[Row(CORR_ID=u'73d483ae-2f0c-57a3-96d4-5d210e61e9a2', avg(DURATION)=6380.0),
 Row(CORR_ID=u'700b8cfb-0207-5ff7-b8ae-54a33c9f03d3', avg(DURATION)=1469.0),
 Row(CORR_ID=u'22279655-6f67-5097-bbda-d359fcd0eb8e', avg(DURATION)=8921.0),
 Row(CORR_ID=u'dc376e83-08b9-58f2-aca4-d3d36379bdd4', avg(DURATION)=1684.0),
 Row(CORR_ID=u'f66036bf-c39d-5e0e-bd20-ae57ad0d7249', avg(DURATION)=1651.0)]

In [20]:
corr_id_dur_map = df_avg_dur.rdd.collectAsMap()

In [21]:
corr_id_dur_map

{u'01c369ea-1e89-59eb-838c-ad178e71f911': 1472.0,
 u'01f33986-ca77-5f48-b874-71b23ac6a6a5': 6577.0,
 u'02ea14cc-7e22-52c6-981b-29f52ab45171': 2342.0,
 u'04409a86-8f47-5780-9ff1-f5bc511fb1a0': 493.0,
 u'05443bfd-8bbe-598e-9a58-f4bd0b8b3c3a': 710.0,
 u'06d6a0b7-74f4-5d5c-9655-370e7e7f288a': 6132.333333333333,
 u'085d83f2-13bf-561c-b54b-e8630e0dfb45': 7009.0,
 u'091e0bd0-ca20-5180-8fc4-9f6b0b0aa2fe': 765.0,
 u'0db59d32-6eff-52ab-b00c-d060d8eb1b7b': 1029.0,
 u'103a9403-d280-534f-8367-e759d39e38cf': 626.0,
 u'13c2f468-c1e3-5e00-892f-9407ab768198': 1870.0,
 u'18328cc2-c1fe-59f4-be81-f5fb38e779f6': 26130.0,
 u'19b000de-49ec-5fee-8073-040592586b70': 451.0,
 u'1a73951f-4870-53be-a643-0bc0dbad95f4': 735.0,
 u'1e16d118-3eec-56e7-a710-b624c3367b35': 307.0,
 u'1e8c92f8-591f-5d22-a34f-62c2a936b8a8': 385.0,
 u'1e9e5623-dfec-5028-b1a5-ed470e9d6504': 4357.0,
 u'1f54ead4-df75-5a9e-9669-0376c49a870d': 819.0,
 u'216a846f-4d1a-5c53-b621-903bd1bf636a': 537.0,
 u'2190d5a8-0595-5a23-b71a-c6dd23e53f2b': 824.0,

In [22]:
# Fill our df with the avf duration
lookup_avg = udf(lambda x: corr_id_dur_map[x] if x in corr_id_dur_map else 0.0)
df = df.withColumn('AVG_DURATION', lookup_avg('CORR_ID').cast('double'))

### Load

On the load step of our ETL, we will load the data into Elasticsearch so we can visualize it on Kibana later.

In [23]:
# Save into ElasticSearch
df.write.format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "elasticsearch") \
    .option("es.resource", "kpi/messages") \
    .save()

In [24]:
spark.stop()