In [1]:
import sys
import os
import csv
import time
import findspark
findspark.init('/Users/georgepatrick/spark-2.4.3')

In [2]:
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder.master("local[2]").appName("link").getOrCreate()
sc = spark.sparkContext
spark.sparkContext.setLogLevel("ERROR")

In [4]:
from pyspark.sql.functions import udf, struct, split, when, regexp_extract
from pyspark.sql.types import IntegerType, FloatType, ArrayType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F

In [5]:
#import the text file 
txt = sc.textFile("/Users/georgepatrick/Logismos/output_debug.txt")

In [6]:
#create an rdd 
rdd = txt.mapPartitions(lambda x: csv.reader(x, delimiter=" "))

In [7]:
#tranform rdd to dataframe
df1 = rdd.toDF(['date','time1','time2','extra_space','tz','id','label'])
#df1.show(34)

In [25]:
#select the columns of interest
df2 = df1.select(["id","label","time1"])
#df2.show()

+---+-----+------------------+
| id|label|             time1|
+---+-----+------------------+
|  1|start|04.17.41.721000000|
|  1|  end|04.17.41.736000000|
|  2|start|04.17.41.752000000|
|  2|  end|04.17.41.767000000|
|  3|start|04.17.41.783000000|
|  3|  end|04.17.41.799000000|
|  4|start|04.17.41.814000000|
|  4|  end|04.17.41.830000000|
|  5|start|04.17.41.846000000|
|  5|  end|04.17.41.861000000|
|  6|start|04.17.41.877000000|
|  6|  end|04.17.41.892000000|
|  7|start|04.17.41.908000000|
|  7|  end|04.17.45.788000000|
|  8|start|04.17.45.819000000|
|  8|  end|04.17.46.553000000|
|  9|start|04.17.46.569000000|
|  9|  end|04.17.46.631000000|
| 10|start|04.17.46.647000000|
| 10|  end|04.17.46.663000000|
+---+-----+------------------+
only showing top 20 rows



In [26]:
#regex to time column and create new col without milliseconds
df3 = df2.withColumn('time_pp',regexp_extract(df2['time1'], '\d{2}\.\d{2}\.\d{2}',0)).drop('time1')

+---+-----+--------+
| id|label| time_pp|
+---+-----+--------+
|  1|start|04.17.41|
|  1|  end|04.17.41|
|  2|start|04.17.41|
|  2|  end|04.17.41|
|  3|start|04.17.41|
|  3|  end|04.17.41|
|  4|start|04.17.41|
|  4|  end|04.17.41|
|  5|start|04.17.41|
|  5|  end|04.17.41|
|  6|start|04.17.41|
|  6|  end|04.17.41|
|  7|start|04.17.41|
|  7|  end|04.17.45|
|  8|start|04.17.45|
|  8|  end|04.17.46|
|  9|start|04.17.46|
|  9|  end|04.17.46|
| 10|start|04.17.46|
| 10|  end|04.17.46|
+---+-----+--------+
only showing top 20 rows



In [27]:
#split col time pp and extract hour, minute, second in string type
df4 = df3.withColumn('time_splitted', F.split(df3['time_pp'], '\.'))\
         .withColumn('hour', F.col('time_splitted')[0])\
         .withColumn('minute', F.col('time_splitted')[1])\
         .withColumn('second', F.col('time_splitted')[2])
#df4.show()

In [15]:
#transform string type cols to ints 
df5 = df4.withColumn('hour_int', df4['hour'].cast(IntegerType())*60*60)\
         .withColumn('minute_int', df4['minute'].cast(IntegerType())*60)\
         .withColumn('second_int', df4['second'].cast(IntegerType()))\
         .drop('hour','minute','second')
         
#create new col sum of seconds
df6 = df5.withColumn('seconds_sum', df5['hour_int']+df5['minute_int']+df5['second_int'])
df6.show(40)

+---+-----+--------+-------------+--------+----------+----------+-----------+
| id|label| time_pp|time_splitted|hour_int|minute_int|second_int|seconds_sum|
+---+-----+--------+-------------+--------+----------+----------+-----------+
|  1|start|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  1|  end|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  2|start|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  2|  end|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  3|start|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  3|  end|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  4|start|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  4|  end|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  5|start|04.17.41| [04, 17, 41]|   14400|      1020|        41|      15461|
|  5|  end|04.17.41| [04, 17, 41]|   14400|      1020|        41

In [16]:
#create dataframe with label == start and label = end and prepare for join 
df_start = df6.select(['id','label','seconds_sum'])\
              .filter(df6['label']=='start')\
              .withColumnRenamed('id', 'id_start')\
              .withColumnRenamed('seconds_sum', 'seconds_start')\
              .drop('label')

df_end = df6.select(['id','label','seconds_sum'])\
              .filter(df6['label']=='end')\
              .withColumnRenamed('id', 'id_end')\
              .withColumnRenamed('seconds_sum', 'seconds_end')\
              .drop('label')

df_start.show(), df_end.show()

+--------+-------------+
|id_start|seconds_start|
+--------+-------------+
|       1|        15461|
|       2|        15461|
|       3|        15461|
|       4|        15461|
|       5|        15461|
|       6|        15461|
|       7|        15461|
|       8|        15465|
|       9|        15466|
|      10|        15466|
|      11|        15466|
|      12|        15466|
|      13|        15466|
|      14|        15466|
|      15|        15466|
|      16|        15491|
|      17|        15491|
+--------+-------------+

+------+-----------+
|id_end|seconds_end|
+------+-----------+
|     1|      15461|
|     2|      15461|
|     3|      15461|
|     4|      15461|
|     5|      15461|
|     6|      15461|
|     7|      15465|
|     8|      15466|
|     9|      15466|
|    10|      15466|
|    11|      15466|
|    12|      15466|
|    13|      15466|
|    14|      15466|
|    15|      15491|
|    16|      15491|
|    17|      15491|
+------+-----------+



(None, None)

In [28]:
#join the dfs 
joindf1 = df_start.join(df_end, df_start.id_start == df_end.id_end)
joindf1.show()

+--------+-------------+------+-----------+
|id_start|seconds_start|id_end|seconds_end|
+--------+-------------+------+-----------+
|       7|        15461|     7|      15465|
|      15|        15466|    15|      15491|
|      11|        15466|    11|      15466|
|       3|        15461|     3|      15461|
|       8|        15465|     8|      15466|
|      16|        15491|    16|      15491|
|       5|        15461|     5|      15461|
|      17|        15491|    17|      15491|
|       6|        15461|     6|      15461|
|       9|        15466|     9|      15466|
|       1|        15461|     1|      15461|
|      10|        15466|    10|      15466|
|       4|        15461|     4|      15461|
|      12|        15466|    12|      15466|
|      13|        15466|    13|      15466|
|      14|        15466|    14|      15466|
|       2|        15461|     2|      15461|
+--------+-------------+------+-----------+



In [29]:
#rename cols not in need in order to drop them also create the difference col 
joindf2 = joindf1.withColumnRenamed('id_start', 'id')\
                 .withColumn('diff', joindf1['seconds_end']-joindf1['seconds_start'])\
                 .drop('id_end').sort("id")
joindf2.show()

+---+-------------+-----------+----+
| id|seconds_start|seconds_end|diff|
+---+-------------+-----------+----+
|  1|        15461|      15461|   0|
| 10|        15466|      15466|   0|
| 11|        15466|      15466|   0|
| 12|        15466|      15466|   0|
| 13|        15466|      15466|   0|
| 14|        15466|      15466|   0|
| 15|        15466|      15491|  25|
| 16|        15491|      15491|   0|
| 17|        15491|      15491|   0|
|  2|        15461|      15461|   0|
|  3|        15461|      15461|   0|
|  4|        15461|      15461|   0|
|  5|        15461|      15461|   0|
|  6|        15461|      15461|   0|
|  7|        15461|      15465|   4|
|  8|        15465|      15466|   1|
|  9|        15466|      15466|   0|
+---+-------------+-----------+----+



In [31]:
#drop col not in need and sort diff desc
joindf3 = joindf2.drop('seconds_start','seconds_end').sort("diff",ascending=False)
joindf3.show()

+---+----+
| id|diff|
+---+----+
| 15|  25|
|  7|   4|
|  8|   1|
| 11|   0|
|  3|   0|
| 16|   0|
| 17|   0|
|  6|   0|
| 10|   0|
|  5|   0|
| 14|   0|
|  9|   0|
|  1|   0|
|  4|   0|
| 12|   0|
| 13|   0|
|  2|   0|
+---+----+

