<a href="https://colab.research.google.com/github/JoaoVitorDeOliveira/glovo/blob/main/Glovo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Events Processing

You have been given a dataset (appEventProcessingDataset.tar.gz) of three csv files
containing the following:
1. Client HTTP endpoint polling event data for a set of devices running a web
application.
2. Internet connectivity status logs for the above set of devices, generated when a device
goes offline whilst running the application.
3. Orders data for orders that have been dispatched to devices running the above web
application.

We’re interested in knowing about the connectivity environment of a device in the period of
time surrounding when an order is dispatched to it.
Using Python and any useful libraries, produce a **single csv formatted output dataset** that
contains the following information:

For each **order** dispatched to a device:


*   The total count of all polling events
*   The count of each type of polling status_code
*   The count of each type of polling error_code and the count of responses without error codes

...across the following periods of time:



*   **Three minutes before** the order creation time
*   **Three minutes after** the order creation time
*   **One hour before** the order creation time


In [None]:
# All dependecies and configurations to create a Spark Environment on Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# make pyspark importable in the environment
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
# Download and import csv files to Spark Environment on Colab
!wget --quiet --show-progress -O connectivity_status.csv https://raw.githubusercontent.com/JoaoVitorDeOliveira/glovo/main/connectivity_status.csv
df_connectivity_status = sc.read.csv("./connectivity_status.csv", inferSchema=True, header=True)

!wget --quiet --show-progress -O orders.csv https://raw.githubusercontent.com/JoaoVitorDeOliveira/glovo/main/orders.csv
df_orders = sc.read.csv("./orders.csv", inferSchema=True, header=True)

!wget --quiet --show-progress -O polling.csv https://raw.githubusercontent.com/JoaoVitorDeOliveira/glovo/main/polling.csv
df_polling = sc.read.csv("./polling.csv", inferSchema=True, header=True)



In [None]:
df_orders.show()

+---+-------------------+---------+--------------------+
|_c0|order_creation_time| order_id|           device_id|
+---+-------------------+---------+--------------------+
|  0|2020-02-26 13:29:00|102523405|                null|
|  1|2020-02-26 21:13:24|102641477|                null|
|  2|2020-02-26 17:04:33|102563229|                null|
|  3|2020-02-26 20:07:45|102622016|                null|
|  4|2020-02-26 21:19:25|102643455|                null|
|  5|2020-02-26 00:08:19|102452116|d0bc996f-72d2-4ec...|
|  6|2020-02-26 10:34:36|102492697|2bb11f99-ab21-462...|
|  7|2020-02-26 11:32:47|102500373|2aec0e20-e1d8-432...|
|  8|2020-02-26 11:46:17|102503173|37638585-a181-426...|
|  9|2020-02-26 11:38:40|102501909|53d16c33-5980-4ad...|
| 10|2020-02-26 11:57:53|102504038|3c533ccb-8739-49c...|
| 11|2020-02-26 12:26:11|102510286|82fb1d95-c894-449...|
| 12|2020-02-26 12:46:31|102514893|7995c021-8983-4f5...|
| 13|2020-02-26 12:52:04|102515379|1cdc8ef5-3752-420...|
| 14|2020-02-26 13:40:44|102525

In [None]:
df_connectivity_status.show()

+---+--------------------+-------+--------------------+
|_c0|       creation_time| status|           device_id|
+---+--------------------+-------+--------------------+
|  0|2020-02-26 19:31:...|OFFLINE|00083c70-7f54-432...|
|  1|2020-02-26 19:31:...| ONLINE|00083c70-7f54-432...|
|  2|2020-02-26 20:08:...|OFFLINE|000fec74-a5b4-40f...|
|  3|2020-02-26 20:09:...| ONLINE|000fec74-a5b4-40f...|
|  4|2020-02-26 00:30:...|OFFLINE|001d3c67-99fd-43b...|
|  5|2020-02-26 00:30:...| ONLINE|001d3c67-99fd-43b...|
|  6|2020-02-26 09:02:...|OFFLINE|002315d2-9ec3-468...|
|  7|2020-02-26 09:02:...| ONLINE|002315d2-9ec3-468...|
|  8|2020-02-26 03:20:...|OFFLINE|002ec021-a3e8-473...|
|  9|2020-02-26 03:20:...|OFFLINE|002ec021-a3e8-473...|
| 10|2020-02-26 03:20:...| ONLINE|002ec021-a3e8-473...|
| 11|2020-02-26 03:20:...| ONLINE|002ec021-a3e8-473...|
| 12|2020-02-26 09:41:...|OFFLINE|002ec021-a3e8-473...|
| 13|2020-02-26 09:41:...|OFFLINE|002ec021-a3e8-473...|
| 14|2020-02-26 09:41:...| ONLINE|002ec021-a3e8-

In [None]:
df_polling.show()

+---+--------------------+--------------------+----------+-----------+
|_c0|       creation_time|           device_id|error_code|status_code|
+---+--------------------+--------------------+----------+-----------+
|  0|2020-02-26 19:16:...|d0460656-95e0-484...|      null|        200|
|  1|2020-02-26 19:16:...|d0460656-95e0-484...|      null|        200|
|  2|2020-02-26 18:31:...|d0460656-95e0-484...|      null|        200|
|  3|2020-02-26 18:31:...|d0460656-95e0-484...|      null|        200|
|  4|2020-02-26 18:31:...|d0460656-95e0-484...|      null|        200|
|  5|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
|  6|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
|  7|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
|  8|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
|  9|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
| 10|2020-02-26 18:30:...|d0460656-95e0-484...|      null|        200|
| 11|2

In [None]:
df_enhanced = (df_orders
              .join(df_polling, 'device_id', 'outer')
              .withColumnRenamed('creation_time', 'polling_creation_time')
              .join(df_connectivity_status, 'device_id', 'outer')
              .withColumnRenamed('creation_time', 'status_creation_time')
              .drop('_c0')
              .where('order_creation_time IS NOT NULL'))
df_enhanced.show()

+--------------------+-------------------+---------+---------------------+------------+-----------+--------------------+------+
|           device_id|order_creation_time| order_id|polling_creation_time|  error_code|status_code|status_creation_time|status|
+--------------------+-------------------+---------+---------------------+------------+-----------+--------------------+------+
|4faa203e-f560-49a...|2020-02-26 16:04:44|102553299| 2020-02-26 16:12:...|        null|        200|                null|  null|
|4faa203e-f560-49a...|2020-02-26 16:04:44|102553299| 2020-02-26 16:11:...|        null|        200|                null|  null|
|4faa203e-f560-49a...|2020-02-26 16:04:44|102553299| 2020-02-26 16:11:...|        null|        200|                null|  null|
|4faa203e-f560-49a...|2020-02-26 16:04:44|102553299| 2020-02-26 16:10:...|        null|        200|                null|  null|
|4faa203e-f560-49a...|2020-02-26 16:04:44|102553299| 2020-02-26 16:09:...|        null|        200|     

In [None]:
df_enhanced.createOrReplaceTempView('ORDERS')

## The total count of all polling events Three minutes before the order creation time



In [None]:
query = '''SELECT order_id, count(1) as all_polling_events_minus_three_min
           FROM ORDERS
           WHERE polling_creation_time IS NOT NULL
           AND polling_creation_time <= (order_creation_time - INTERVAL 3 minutes) 
           GROUP BY order_id
        '''
all_polling_minus_three_min = sc.sql(query)
all_polling_minus_three_min.show()

+---------+----------------------------------+
| order_id|all_polling_events_minus_three_min|
+---------+----------------------------------+
|102638473|                              1443|
|102513750|                               942|
|102609981|                              3105|
|102603465|                              3036|
|102565868|                               465|
|102535727|                              3161|
|102651853|                               550|
|102602920|                               310|
|102594830|                               634|
|102551214|                              3752|
|102633195|                              2669|
|102516534|                              2388|
|102650480|                              2707|
|102540074|                              2036|
|102650281|                              3781|
|102648123|                              2353|
|102517631|                              2804|
|102571599|                                41|
|102644931|  

## The total count of all polling events Three minutes after the order creation time

In [None]:
query = '''SELECT order_id, count(1) as all_polling_events_plus_three_min
           FROM ORDERS
           WHERE polling_creation_time IS NOT NULL
           AND polling_creation_time <= (order_creation_time + INTERVAL 3 minutes) 
           GROUP BY order_id
        '''
all_polling_plus_three_min = sc.sql(query)
all_polling_plus_three_min.show()

+---------+---------------------------------+
| order_id|all_polling_events_plus_three_min|
+---------+---------------------------------+
|102638473|                             1443|
|102513750|                              988|
|102609981|                             3128|
|102603465|                             3059|
|102565868|                              485|
|102535727|                             3172|
|102651853|                              550|
|102602920|                              311|
|102594830|                              646|
|102551214|                             3798|
|102633195|                             2693|
|102516534|                             2480|
|102650480|                             2730|
|102540074|                             2059|
|102650281|                             3799|
|102648123|                             2375|
|102517631|                             2827|
|102571599|                               41|
|102644931|                       

## The total count of all polling events One hour before the order creation time

In [None]:
query = '''SELECT order_id, count(1) as all_polling_events_minus_one_hour
           FROM ORDERS
           WHERE polling_creation_time IS NOT NULL
           AND polling_creation_time <= (order_creation_time - INTERVAL 1 hour) 
           GROUP BY order_id
        '''
all_polling_minus_one_hour = sc.sql(query)
all_polling_minus_one_hour.show()

+---------+---------------------------------+
| order_id|all_polling_events_minus_one_hour|
+---------+---------------------------------+
|102638473|                             1443|
|102513750|                              500|
|102609981|                             2885|
|102603465|                             2814|
|102565868|                              449|
|102535727|                             2941|
|102651853|                              439|
|102602920|                              270|
|102594830|                              621|
|102551214|                             3314|
|102633195|                             2447|
|102516534|                             1516|
|102650480|                             2486|
|102540074|                             1816|
|102648123|                             2155|
|102650281|                             3370|
|102517631|                             2582|
|102571599|                               41|
|102644931|                       

In [None]:
df_all_polling_events = all_polling_minus_three_min.join(all_polling_plus_three_min, 'order_id').join(all_polling_minus_one_hour, 'order_id')
df_all_polling_events.show()

+---------+----------------------------------+---------------------------------+---------------------------------+
| order_id|all_polling_events_minus_three_min|all_polling_events_plus_three_min|all_polling_events_minus_one_hour|
+---------+----------------------------------+---------------------------------+---------------------------------+
|102513750|                               942|                              988|                              500|
|102535727|                              3161|                             3172|                             2941|
|102551214|                              3752|                             3798|                             3314|
|102565868|                               465|                              485|                              449|
|102594830|                               634|                              646|                              621|
|102602920|                               310|                              311|

## The count of each type of polling status_code Three minutes before the order creation time

In [None]:
# it's possible to improve performace by adding the columns as argument to pivot ['0', '200', '401']

query = '''SELECT order_id, status_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 3 minutes) 
       '''
#Through DataFrame API also works
#each_poll_status_code_minus_three_min = df_enhanced.filter(f.col('polling_creation_time') <= (f.col("order_creation_time") - f.expr("INTERVAL 3 minutes"))).select('order_id', 'status_code')

each_poll_status_code_minus_three_min = sc.sql(query)
each_poll_status_code_minus_three_min = each_poll_status_code_minus_three_min.groupBy('order_id').pivot('status_code').count()
each_poll_status_code_minus_three_min = each_poll_status_code_minus_three_min.withColumnRenamed('0', 'status_code_0_minus_three_min')\
                                                                              .withColumnRenamed('200', 'status_code_200_minus_three_min')\
                                                                              .withColumnRenamed('401', 'status_code_401_minus_three_min')
each_poll_status_code_minus_three_min.show()

+---------+-----------------------------+-------------------------------+-------------------------------+
| order_id|status_code_0_minus_three_min|status_code_200_minus_three_min|status_code_401_minus_three_min|
+---------+-----------------------------+-------------------------------+-------------------------------+
|102603465|                            1|                           3035|                           null|
|102551214|                         null|                           3752|                           null|
|102594830|                            3|                            630|                              1|
|102609981|                            1|                           3104|                           null|
|102651853|                            5|                            544|                              1|
|102535727|                            1|                           3160|                           null|
|102602920|                            2|     

## The count of each type of polling status_code Three minutes after the order creation time

In [None]:
query = '''SELECT order_id, status_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time + INTERVAL 3 minutes) 
       '''
each_poll_status_code_plus_three_min = sc.sql(query)
each_poll_status_code_plus_three_min = each_poll_status_code_plus_three_min.groupBy('order_id').pivot('status_code').count()
each_poll_status_code_plus_three_min = each_poll_status_code_plus_three_min.withColumnRenamed('0', 'status_code_0_plus_three_min')\
                                                                              .withColumnRenamed('200', 'status_code_200_plus_three_min')\
                                                                              .withColumnRenamed('401', 'status_code_401_plus_three_min')
each_poll_status_code_plus_three_min.show()

+---------+----------------------------+------------------------------+------------------------------+
| order_id|status_code_0_plus_three_min|status_code_200_plus_three_min|status_code_401_plus_three_min|
+---------+----------------------------+------------------------------+------------------------------+
|102603465|                           1|                          3058|                          null|
|102551214|                        null|                          3798|                          null|
|102594830|                           3|                           642|                             1|
|102609981|                           1|                          3127|                          null|
|102651853|                           5|                           544|                             1|
|102535727|                           1|                          3171|                          null|
|102602920|                           2|                           308|  

## The count of each type of polling status_code One hour before the order creation time

In [None]:
query = '''SELECT order_id, status_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 1 hour) 
       '''

each_poll_status_code_minus_one_hour = sc.sql(query)
each_poll_status_code_minus_one_hour = each_poll_status_code_minus_one_hour.groupBy('order_id').pivot('status_code').count()
each_poll_status_code_minus_one_hour = each_poll_status_code_minus_one_hour.withColumnRenamed('0', 'status_code_0_minus_one_hour')\
                                                                              .withColumnRenamed('200', 'status_code_200_minus_one_hour')\
                                                                              .withColumnRenamed('401', 'status_code_401_minus_one_hour')
each_poll_status_code_minus_one_hour.show()

+---------+----------------------------+------------------------------+------------------------------+
| order_id|status_code_0_minus_one_hour|status_code_200_minus_one_hour|status_code_401_minus_one_hour|
+---------+----------------------------+------------------------------+------------------------------+
|102603465|                           1|                          2813|                          null|
|102551214|                        null|                          3314|                          null|
|102594830|                           3|                           618|                          null|
|102609981|                           1|                          2884|                          null|
|102651853|                           3|                           435|                             1|
|102535727|                           1|                          2940|                          null|
|102602920|                           1|                           268|  

In [None]:
df_each_poll_status_code_count = each_poll_status_code_minus_three_min.join(each_poll_status_code_plus_three_min, 'order_id').join(each_poll_status_code_minus_one_hour, 'order_id')
df_each_poll_status_code_count.show()

+---------+-----------------------------+-------------------------------+-------------------------------+----------------------------+------------------------------+------------------------------+----------------------------+------------------------------+------------------------------+
| order_id|status_code_0_minus_three_min|status_code_200_minus_three_min|status_code_401_minus_three_min|status_code_0_plus_three_min|status_code_200_plus_three_min|status_code_401_plus_three_min|status_code_0_minus_one_hour|status_code_200_minus_one_hour|status_code_401_minus_one_hour|
+---------+-----------------------------+-------------------------------+-------------------------------+----------------------------+------------------------------+------------------------------+----------------------------+------------------------------+------------------------------+
|102513750|                         null|                            942|                           null|                        null|  

## The count of each type of polling error_code Three minutes before the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 3 minutes)
           AND error_code IS NOT NULL
        '''
each_poll_error_code_minus_three_min = sc.sql(query)
each_poll_error_code_minus_three_min = each_poll_error_code_minus_three_min.groupBy('order_id').pivot('error_code').count()
each_poll_error_code_minus_three_min = each_poll_error_code_minus_three_min.withColumnRenamed('ECONNABORTED', 'ECONNABORTED_ERROR_minus_three_min')\
                                                                              .withColumnRenamed('GENERIC_ERROR', 'GENERIC_ERROR_minus_three_min')
each_poll_error_code_minus_three_min.show()

+---------+----------------------------------+-----------------------------+
| order_id|ECONNABORTED_ERROR_minus_three_min|GENERIC_ERROR_minus_three_min|
+---------+----------------------------------+-----------------------------+
|102603465|                                 1|                         null|
|102602920|                                 2|                            1|
|102651853|                                 5|                            1|
|102609981|                                 1|                         null|
|102535727|                                 1|                         null|
|102594830|                                 3|                            1|
|102644931|                                 3|                            1|
|102540074|                                 1|                         null|
|102633195|                                 1|                            1|
|102650480|                                 1|                         null|

## The count of each type of polling error_code Three minutes after the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time + INTERVAL 3 minutes)
           AND error_code IS NOT NULL
        '''
each_poll_error_code_plus_three_min = sc.sql(query)
each_poll_error_code_plus_three_min = each_poll_error_code_plus_three_min.groupBy('order_id').pivot('error_code').count()
each_poll_error_code_plus_three_min = each_poll_error_code_plus_three_min.withColumnRenamed('ECONNABORTED', 'ECONNABORTED_ERROR_plus_three_min')\
                                                                              .withColumnRenamed('GENERIC_ERROR', 'GENERIC_ERROR_plus_three_min')
each_poll_error_code_plus_three_min.show()

+---------+---------------------------------+----------------------------+
| order_id|ECONNABORTED_ERROR_plus_three_min|GENERIC_ERROR_plus_three_min|
+---------+---------------------------------+----------------------------+
|102603465|                                1|                        null|
|102602920|                                2|                           1|
|102651853|                                5|                           1|
|102609981|                                1|                        null|
|102535727|                                1|                        null|
|102594830|                                3|                           1|
|102644931|                                3|                           1|
|102540074|                                1|                        null|
|102633195|                                1|                           1|
|102650480|                                1|                        null|
|102541072|              

## The count of each type of polling error_code One hour before the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 1 hour)
           AND error_code IS NOT NULL
        '''
each_poll_error_code_minus_one_hour = sc.sql(query)
each_poll_error_code_minus_one_hour = each_poll_error_code_minus_one_hour.groupBy('order_id').pivot('error_code').count()
each_poll_error_code_minus_one_hour = each_poll_error_code_minus_one_hour.withColumnRenamed('ECONNABORTED', 'ECONNABORTED_ERROR_minus_one_hour')\
                                                                              .withColumnRenamed('GENERIC_ERROR', 'GENERIC_ERROR_minus_one_hour')
each_poll_error_code_minus_one_hour.show()

+---------+---------------------------------+----------------------------+
| order_id|ECONNABORTED_ERROR_minus_one_hour|GENERIC_ERROR_minus_one_hour|
+---------+---------------------------------+----------------------------+
|102603465|                                1|                        null|
|102602920|                                1|                           1|
|102651853|                                3|                           1|
|102609981|                                1|                        null|
|102535727|                                1|                        null|
|102594830|                                3|                        null|
|102644931|                                3|                           1|
|102540074|                                1|                        null|
|102633195|                                1|                           1|
|102650480|                                1|                        null|
|102648123|              

In [None]:
df_each_poll_error_code_count = each_poll_error_code_minus_three_min.join(each_poll_error_code_plus_three_min, 'order_id').join(each_poll_error_code_minus_one_hour, 'order_id')
df_each_poll_error_code_count.show()

+---------+----------------------------------+-----------------------------+---------------------------------+----------------------------+---------------------------------+----------------------------+
| order_id|ECONNABORTED_ERROR_minus_three_min|GENERIC_ERROR_minus_three_min|ECONNABORTED_ERROR_plus_three_min|GENERIC_ERROR_plus_three_min|ECONNABORTED_ERROR_minus_one_hour|GENERIC_ERROR_minus_one_hour|
+---------+----------------------------------+-----------------------------+---------------------------------+----------------------------+---------------------------------+----------------------------+
|102535727|                                 1|                         null|                                1|                        null|                                1|                        null|
|102594830|                                 3|                            1|                                3|                           1|                                3|               

## The count of responses without error codes Three minutes before the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 3 minutes)
           AND error_code IS NULL
        '''

each_poll_without_error_code_minus_three_min = sc.sql(query)
each_poll_without_error_code_minus_three_min = each_poll_without_error_code_minus_three_min.groupBy('order_id').pivot('error_code').count()
each_poll_without_error_code_minus_three_min = each_poll_without_error_code_minus_three_min.withColumnRenamed('null', 'responses_without_error_minus_three_min')

each_poll_without_error_code_minus_three_min.show()

+---------+---------------------------------------+
| order_id|responses_without_error_minus_three_min|
+---------+---------------------------------------+
|102638473|                                   1443|
|102513750|                                    942|
|102609981|                                   3104|
|102603465|                                   3035|
|102565868|                                    465|
|102535727|                                   3160|
|102651853|                                    544|
|102602920|                                    307|
|102594830|                                    630|
|102551214|                                   3752|
|102633195|                                   2667|
|102516534|                                   2388|
|102650480|                                   2706|
|102540074|                                   2035|
|102650281|                                   3781|
|102648123|                                   2347|
|102517631| 

## The count of responses without error codes Three minutes after the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time + INTERVAL 3 minutes)
           AND error_code IS NULL
        '''

each_poll_without_error_code_plus_three_min = sc.sql(query)
each_poll_without_error_code_plus_three_min = each_poll_without_error_code_plus_three_min.groupBy('order_id').pivot('error_code').count()
each_poll_without_error_code_plus_three_min = each_poll_without_error_code_plus_three_min.withColumnRenamed('null', 'responses_without_error_plus_three_min')

each_poll_without_error_code_plus_three_min.show()

+---------+--------------------------------------+
| order_id|responses_without_error_plus_three_min|
+---------+--------------------------------------+
|102638473|                                  1443|
|102513750|                                   988|
|102609981|                                  3127|
|102603465|                                  3058|
|102565868|                                   485|
|102535727|                                  3171|
|102651853|                                   544|
|102602920|                                   308|
|102594830|                                   642|
|102551214|                                  3798|
|102633195|                                  2691|
|102516534|                                  2480|
|102650480|                                  2729|
|102540074|                                  2058|
|102650281|                                  3799|
|102648123|                                  2369|
|102517631|                    

## The count of responses without error codes One hour before the order creation time

In [None]:
query = '''SELECT order_id, error_code
           FROM ORDERS
           WHERE polling_creation_time <= (order_creation_time - INTERVAL 1 hour)
           AND error_code IS NULL
        '''

each_poll_without_error_code_minus_one_hour = sc.sql(query)
each_poll_without_error_code_minus_one_hour = each_poll_without_error_code_minus_one_hour.groupBy('order_id').pivot('error_code').count()
each_poll_without_error_code_minus_one_hour = each_poll_without_error_code_minus_one_hour.withColumnRenamed('null', 'responses_without_error_minus_one_hour')

each_poll_without_error_code_minus_one_hour.show()

+---------+--------------------------------------+
| order_id|responses_without_error_minus_one_hour|
+---------+--------------------------------------+
|102638473|                                  1443|
|102513750|                                   500|
|102609981|                                  2884|
|102603465|                                  2813|
|102565868|                                   449|
|102535727|                                  2940|
|102651853|                                   435|
|102602920|                                   268|
|102594830|                                   618|
|102551214|                                  3314|
|102633195|                                  2445|
|102516534|                                  1516|
|102650480|                                  2485|
|102540074|                                  1815|
|102648123|                                  2149|
|102650281|                                  3370|
|102517631|                    

In [None]:
df_responses_without_error_count = each_poll_without_error_code_minus_three_min.join(each_poll_without_error_code_plus_three_min, 'order_id').join(each_poll_without_error_code_minus_one_hour, 'order_id')
df_responses_without_error_count.show()

+---------+---------------------------------------+--------------------------------------+--------------------------------------+
| order_id|responses_without_error_minus_three_min|responses_without_error_plus_three_min|responses_without_error_minus_one_hour|
+---------+---------------------------------------+--------------------------------------+--------------------------------------+
|102513750|                                    942|                                   988|                                   500|
|102535727|                                   3160|                                  3171|                                  2940|
|102551214|                                   3752|                                  3798|                                  3314|
|102565868|                                    465|                                   485|                                   449|
|102594830|                                    630|                                   642|

In [None]:
df_result = df_all_polling_events.join(df_each_poll_status_code_count, 'order_id')\
                                 .join(df_each_poll_error_code_count, 'order_id')\
                                 .join(df_responses_without_error_count, 'order_id')

columns = ['order_id',
           'all_polling_events_minus_three_min',
            'status_code_0_minus_three_min',
            'status_code_200_minus_three_min',
            'status_code_401_minus_three_min',
            'ECONNABORTED_ERROR_minus_three_min',
            'GENERIC_ERROR_minus_three_min',
            'responses_without_error_minus_three_min',
            'all_polling_events_plus_three_min',
            'status_code_0_plus_three_min',
            'status_code_200_plus_three_min',
            'status_code_401_plus_three_min',
            'ECONNABORTED_ERROR_plus_three_min',
            'GENERIC_ERROR_plus_three_min',
            'responses_without_error_plus_three_min',
            'all_polling_events_minus_one_hour',
            'status_code_0_minus_one_hour',
            'status_code_200_minus_one_hour',
            'status_code_401_minus_one_hour',
            'ECONNABORTED_ERROR_minus_one_hour',
            'GENERIC_ERROR_minus_one_hour',
            'responses_without_error_minus_one_hour']

df_result.select(columns).show()

+---------+----------------------------------+-----------------------------+-------------------------------+-------------------------------+----------------------------------+-----------------------------+---------------------------------------+---------------------------------+----------------------------+------------------------------+------------------------------+---------------------------------+----------------------------+--------------------------------------+---------------------------------+----------------------------+------------------------------+------------------------------+---------------------------------+----------------------------+--------------------------------------+
| order_id|all_polling_events_minus_three_min|status_code_0_minus_three_min|status_code_200_minus_three_min|status_code_401_minus_three_min|ECONNABORTED_ERROR_minus_three_min|GENERIC_ERROR_minus_three_min|responses_without_error_minus_three_min|all_polling_events_plus_three_min|status_code_0_plus_th

In addition to the above, across an unbounded period of time, we would like to know:
1. The time of the polling event immediately preceding, and immediately following the
order creation time.

2. The most recent connectivity status (“ONLINE” or “OFFLINE”) before an order, and at
what time the order changed to this status. This can be across any period of time
before the order creation time. Not all devices have a connectivity status.

##Unfortanetly I didn't have time to implment this part but I would try something like LEAD and LAG to the preceding and following polling events for the first item and something like a filter to have the last status where the time creation is below the order creation for the second item.

In [None]:
df_result.toPandas().to_csv('result_glovo.csv', index=False)