In [2]:
import pandas_gbq
import os
import numpy as np
import requests
import datetime
import pandas as pd
import re
import json
from datetime import date, timedelta
from datetime import datetime
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr,first, last,when, split, col,lit, concat, date_format,to_utc_timestamp,to_timestamp, regexp_replace,concat_ws
#client = bigquery.Client(location="us-central1")
#print("Client creating using default project: {}".format(client.project))
pd.set_option('display.width', 1000)
pd.set_option("max_colwidth",10000)
pd.set_option("max_rows",1000)
pd.set_option("max_columns",100)

In [3]:
spark = SparkSession.builder.appName("vt1EventsClassify").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 18:09:57 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/05/03 18:09:57 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/05/03 18:09:57 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/03 18:09:57 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [44]:
current_date = datetime.today()
dias_atras = 17
str_day = (current_date - timedelta(days = dias_atras)).strftime("%Y-%m-%d")
#STR_YEARMONTH = (current_date - timedelta(days = dias_atras)).strftime("%Y%m")
#str_day = "2022-04-16"
#STR_YEARMONTH = "201908"
hour_ini_1 = '00:00:00'
hour_fin_1 = '23:59:59'
print(str_day,hour_ini_1,hour_fin_1)


2023-04-16 00:00:00 23:59:59


### Read Events Metadata

In [45]:
df_metadata = spark.read.format('bigquery').option('project','saas-analytics-io').option('table','vt1_v1data1.metadata').option("filter", """date(time) = '%s' and time(time)
    between '%s' and '%s'"""%(str_day,hour_ini_1,hour_fin_1)).load()

In [46]:
df_metadata.printSchema()

root
 |-- id: string (nullable = true)
 |-- serialNumber: string (nullable = true)
 |-- tenantCode: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- group: long (nullable = true)
 |-- thingType: string (nullable = true)
 |-- bridgeKey: string (nullable = true)
 |-- specName: string (nullable = true)
 |-- priority: long (nullable = true)
 |-- requestId: string (nullable = true)
 |-- action: string (nullable = true)



### Read Events UDF (Filtered for FinOps)

In [47]:
df_udf_test = spark.read.format('bigquery').option('project','saas-analytics-io').option('table','vt1_v1data1.udf').option("filter", """date(time) = '%s' and time(time)
    between '%s' and '%s'"""%(str_day,hour_ini_1,hour_fin_1)).load()

In [48]:
print("Total Registros udf_VT1 ", df_udf_test.count())

Total Registros udf_VT1  36662541


### Data Prepairing

In [49]:
### Metadata Prepairing
df_metadata = df_metadata.withColumn('bridgeKey_2',when((df_metadata["bridgeKey"]=="/SERVICES"), split(df_metadata["bridgeKey"],"/").getItem(1))
                                     .when((df_metadata["bridgeKey"]!='/SERVICES'), split(df_metadata["bridgeKey"],"/").getItem(2)))
df_metadata = df_metadata.withColumn('id_serial_number',concat(df_metadata['id'],lit('-'), df_metadata['serialNumber']))

df_metadata = df_metadata.withColumn('datetime_h', date_format(df_metadata['time'], "d/M/y H"))
df_metadata = df_metadata.withColumn('datetime_h', to_utc_timestamp(to_timestamp(df_metadata['datetime_h'],'d/M/y H'), 'UTC'))

In [50]:
print("Total Registros metadata ", df_metadata.count())
print("Total Registros unicos ", df_metadata.distinct().count())

Total Registros metadata  7012496




Total Registros unicos  6984481


                                                                                

In [51]:
##### UDF Prepairing
df_udf_test = df_udf_test.withColumn('id_serial_number',concat(df_udf_test['metadataId'],lit('-'), df_udf_test['serialNumber']))
df_udf_test = df_udf_test.distinct()

In [52]:
df_udf_test.count()

                                                                                

35837843

In [53]:
df_pivot = df_udf_test.groupBy("id_serial_number").pivot("key").agg(first("value")).fillna('N/A')

                                                                                

In [54]:
df_pivot = df_udf_test.groupBy("id_serial_number").pivot("key").agg(first("value")).fillna('N/A')

                                                                                

In [55]:
#### Merge UDF With Metadata (Spark)
df_full = df_metadata.join(df_pivot,df_metadata.id_serial_number ==  df_pivot.id_serial_number,"outer")
df_full = df_full.fillna('N/A')


In [56]:
df_full = df_full.withColumn('KeyField',concat_ws("-",df_full['tenantCode'],df_full['thingType'],df_full['bridgeKey_2'],df_full['specName'],df_full['source']))

In [57]:
df_full.printSchema()

root
 |-- id: string (nullable = false)
 |-- serialNumber: string (nullable = false)
 |-- tenantCode: string (nullable = false)
 |-- time: timestamp (nullable = true)
 |-- group: long (nullable = true)
 |-- thingType: string (nullable = false)
 |-- bridgeKey: string (nullable = false)
 |-- specName: string (nullable = false)
 |-- priority: long (nullable = true)
 |-- requestId: string (nullable = false)
 |-- action: string (nullable = false)
 |-- bridgeKey_2: string (nullable = false)
 |-- id_serial_number: string (nullable = false)
 |-- datetime_h: timestamp (nullable = true)
 |-- id_serial_number: string (nullable = false)
 |-- Event_Action: string (nullable = false)
 |-- Event_Activity: string (nullable = false)
 |-- Event_Category: string (nullable = false)
 |-- Event_Code: string (nullable = false)
 |-- Event_Count: string (nullable = false)
 |-- Event_Date: string (nullable = false)
 |-- Event_Detail: string (nullable = false)
 |-- Event_DeviceId: string (nullable = false)
 |-- E

In [58]:
#display(df_full)

### Read Business Process List (FeatureSet, Process, SubProcess)

In [59]:
#######List of Business Process
url_1 = 'https://docs.google.com/spreadsheets/d/1u2ND8VO0CifSLwIwcqrQAfYS05JpAahizeeRNYtRHCo/edit#gid=1904341781'
url = url_1.replace('/edit#gid=', '/export?format=csv&gid=')
df_list = pd.read_csv(url, dtype=str)
DF_list = spark.createDataFrame(df_list.astype(str)) 
#df_test = df_test.rename(columns={'Category Long':'CATEGORY_LONG'})

In [60]:
DF_list.show()

+----------+----------------+-------------+---------------+--------+--------------------+----------+-------+----------+
|tenantCode|       thingType|  bridgeKey_2|       specName|  source|          KeyField_L|FeatureSet|Process|SubProcess|
+----------+----------------+-------------+---------------+--------+--------------------+----------+-------+----------+
|      BOLZ|        FACILITY|      RULESET|      SCHEDULED|REP_1393|BOLZ-FACILITY-RUL...|    BOLZ-1| BOLZ-1|    BOLZ-1|
|      BOLZ|            ITEM|          nan|       SERVICES|     nan|BOLZ-ITEM-N/A-SER...|    BOLZ-2| BOLZ-2|    BOLZ-2|
|      BOLZ|            ITEM|     SERVICES|       SERVICES|     nan|BOLZ-ITEM-SERVICE...|    BOLZ-3| BOLZ-3|    BOLZ-3|
|      CTVA|            MCON|   ALEB_MOJIX|            nan|  JSRULE|CTVA-MCON-ALEB_MO...|    CTVA-1| CTVA-1|    CTVA-1|
|      CTVA|NOTIFICATIONTEST|   ALEB_MOJIX|            nan|     nan|CTVA-NOTIFICATION...|    CTVA-2| CTVA-2|    CTVA-2|
|      CTVA|NOTIFICATIONTEST|      RULES

In [61]:
DF_list_f = DF_list[['KeyField_L','FeatureSet','Process','SubProcess']]

In [62]:
df_full_c = df_full.join(DF_list_f,df_full.KeyField == DF_list_f.KeyField_L,"Left")

In [63]:
#Creation of three important fields --> EventSource, StoreCode, Is_InternalEvent
df_full_c = df_full_c.withColumn('EventSource',lit('N/A'))
df_full_c = df_full_c.withColumn('StoreCode',lit('N/A'))
df_full_c = df_full_c.withColumn('Is_InternalEvent',lit('N/A'))

In [64]:
df_full_c = df_full_c.withColumn('time', regexp_replace('time', '2022-12-07', '2022-12-08')) 
df_full_c = df_full_c.withColumn('datetime_h', date_format(df_full_c['time'], "d/M/y H"))
df_full_c = df_full_c.withColumn('datetime_h', to_utc_timestamp(to_timestamp(df_full_c['datetime_h'],'d/M/y H'), 'UTC'))

In [65]:
#display(df_full_c)

In [66]:
#display(df_full_c.groupBy(['KeyField','FeatureSet']).count())

#### Aggregation of Events

In [67]:
df_agg = df_full_c.groupBy(['datetime_h','tenantCode','thingType','specName','bridgeKey_2','source','FeatureSet','Process','SubProcess','EventSource','StoreCode','Is_InternalEvent']).count()

In [68]:
df_agg = df_agg.withColumnRenamed("count","mojix_blink_count")

In [69]:
df_agg_pd = df_agg.toPandas()

                                                                                

In [70]:
table = "saas-analytics-io.processed.vt1_event_classify_finops"
df_agg_pd.to_gbq(table,  if_exists='append')


100%|██████████| 1/1 [00:00<00:00, 9177.91it/s]
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 59784)
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 239, in acc

In [None]:
#df_agg[df_agg['datetime_h'].isNull()].show()