In [1]:
import pandas as pd

In [2]:
columns = ['task', 'status', 'trackerId','server_region','timestamp', 'server']

df = pd.read_csv('log.txt', sep = '|', header= None, names = columns)

In [3]:
file = df.copy()

In [4]:
file.head(5) 

Unnamed: 0,task,status,trackerId,server_region,timestamp,server
0,Data quality assurance,success,89328463487,Florida;USA,1599093305000,464f:bdbd:1d:535a:2dad:5117:a92f:b359
1,Designing and implementing data models,success,89328463488,Ontario;Canada,1589218599000,1366:c314:8d40:4219:4a51:138f:7ce3:d244
2,Extracting data from various sources,success,89328463489,California;USA,1595463446000,1ffa:cdd1:7a2b:a7d8:ab00:eada:94b3:7a7c
3,Loading data into a data warehouse,success,89328463490,Florida;USA,1605932487000,8ac5:9010:f24c:2f06:4679:aaec:a737:f003
4,Data quality assurance,success,89328463491,Queensland;Australia,1587354266000,5a7d:18ec:4650:5421:185c:4fa0:2e86:6643


In [5]:
file.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   task           1000 non-null   object
 1   status         1000 non-null   object
 2   trackerId      1000 non-null   int64 
 3   server_region  1000 non-null   object
 4   timestamp      1000 non-null   int64 
 5   server         1000 non-null   object
dtypes: int64(2), object(4)
memory usage: 47.0+ KB


In [6]:
file = file.drop_duplicates()

## issues
- set trackerId as index
- split state and country
- change timestamp from unix to datetime 
- check failed operations
- convert to csv format


## set trackerId as index

In [7]:
file = file.set_index('trackerId')

In [8]:
file.head(3)

Unnamed: 0_level_0,task,status,server_region,timestamp,server
trackerId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
89328463487,Data quality assurance,success,Florida;USA,1599093305000,464f:bdbd:1d:535a:2dad:5117:a92f:b359
89328463488,Designing and implementing data models,success,Ontario;Canada,1589218599000,1366:c314:8d40:4219:4a51:138f:7ce3:d244
89328463489,Extracting data from various sources,success,California;USA,1595463446000,1ffa:cdd1:7a2b:a7d8:ab00:eada:94b3:7a7c


## split state and country

In [9]:
## defining functions to split state and country

def split_state(text):
    state = text.split(';')[0]
    return state

def split_country(text):
    country = text.split(';')[1]
    return country

In [10]:
file['state'] = file['server_region'].apply(split_state)  # split the state

file['country'] = file['server_region'].apply(split_country)  # split the country

In [11]:
file.tail(3)

Unnamed: 0_level_0,task,status,server_region,timestamp,server,state,country
trackerId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
89328464484,Optimizing database performance,success,Texas;USA,1607161055000,cc7c:3591:1c62:280c:b259:dc89:e69d:2d75,Texas,USA
89328464485,Building data pipelines,success,Texas;USA,1588130663000,3336:1461:f197:bad9:705f:7dd:8c59:a5e4,Texas,USA
89328464486,Designing and implementing data models,success,New York;USA,1604987132000,633d:e149:e22f:a277:b5f6:dae8:b32c:ea9e,New York,USA


## change timestamp from unix to datetime 

In [12]:
## create a function to convert from milliseconds to seconds

def slice_zeros(text):
    seconds = text[:-3]
    return seconds


In [13]:
file['timestamp'] = file['timestamp'].astype(str)  ## convert to string first to enable iteration and slicing

In [14]:
file['datetime'] = file['timestamp'].apply(slice_zeros)  ## apply slicing to timestamp column

In [15]:
file['datetime'] = pd.to_datetime(file['datetime'], unit='s')  #convert to datetime

In [16]:
file.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000 entries, 89328463487 to 89328464486
Data columns (total 8 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   task           1000 non-null   object        
 1   status         1000 non-null   object        
 2   server_region  1000 non-null   object        
 3   timestamp      1000 non-null   object        
 4   server         1000 non-null   object        
 5   state          1000 non-null   object        
 6   country        1000 non-null   object        
 7   datetime       1000 non-null   datetime64[ns]
dtypes: datetime64[ns](1), object(7)
memory usage: 70.3+ KB


In [17]:
file.head(3)

Unnamed: 0_level_0,task,status,server_region,timestamp,server,state,country,datetime
trackerId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
89328463487,Data quality assurance,success,Florida;USA,1599093305000,464f:bdbd:1d:535a:2dad:5117:a92f:b359,Florida,USA,2020-09-03 00:35:05
89328463488,Designing and implementing data models,success,Ontario;Canada,1589218599000,1366:c314:8d40:4219:4a51:138f:7ce3:d244,Ontario,Canada,2020-05-11 17:36:39
89328463489,Extracting data from various sources,success,California;USA,1595463446000,1ffa:cdd1:7a2b:a7d8:ab00:eada:94b3:7a7c,California,USA,2020-07-23 00:17:26


## exporting as a csv file

In [18]:
file.to_csv('log.csv')  ## save and export as csv file

## Working with Pyspark

In [19]:
from pyspark.sql import SparkSession

In [20]:
spark = SparkSession.builder.appName('pyspark_clean').getOrCreate()  # creating a new sparksession

## creating a schema and reading file

A schema defines the columns name and types of a dataframe

In [43]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#creating a schema to define the columns name and type
schema = StructType([
    StructField("task", StringType(), True),
    StructField("status", StringType(), True),
    StructField("trackerId", StringType(), True),
    StructField("server_region", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("server", StringType(), True)
])

In [44]:
log_file = spark.read.csv('log.txt', schema= schema,sep= '|')  #reading text file

In [45]:
log_file.show(3) 

+--------------------+-------+-----------+--------------+-------------+--------------------+
|                task| status|  trackerId| server_region|    timestamp|              server|
+--------------------+-------+-----------+--------------+-------------+--------------------+
|Data quality assu...|success|89328463487|   Florida;USA|1599093305000|464f:bdbd:1d:535a...|
|Designing and imp...|success|89328463488|Ontario;Canada|1589218599000|1366:c314:8d40:42...|
|Extracting data f...|success|89328463489|California;USA|1595463446000|1ffa:cdd1:7a2b:a7...|
+--------------------+-------+-----------+--------------+-------------+--------------------+
only showing top 3 rows



In [24]:
log_file.printSchema()

root
 |-- task: string (nullable = true)
 |-- status: string (nullable = true)
 |-- trackerId: string (nullable = true)
 |-- server_region: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- server: string (nullable = true)



In [25]:
log_file.describe().show()

+-------+--------------------+-------+-----------------+--------------+-------------------+--------------------+
|summary|                task| status|        trackerId| server_region|          timestamp|              server|
+-------+--------------------+-------+-----------------+--------------+-------------------+--------------------+
|  count|                1000|   1000|             1000|          1000|               1000|                1000|
|   mean|                null|   null| 8.93284639865E10|          null|  1.596842686302E12|                null|
| stddev|                null|   null|288.8194360957494|          null|6.517130087623664E9|                null|
|    min|Building data pip...|      -|      89328463487|California;USA|      1585703319000|105a:b192:fae1:cc...|
|    max|Transforming data...|success|      89328464486|   Tokyo;Japan|      1608159945000|ffda:a2f4:7264:ec...|
+-------+--------------------+-------+-----------------+--------------+-------------------+-----

## converting timestamp column from unix epoch to timestamp

In [46]:
log_file = log_file.withColumn('date_time', log_file['timestamp']/1000).\
    withColumn('date_time', from_unixtime(col('date_time')))  ## converting from unix epoch to string of current system time


In [47]:
log_file.show(3)

+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
|                task| status|  trackerId| server_region|    timestamp|              server|          date_time|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
|Data quality assu...|success|89328463487|   Florida;USA|1599093305000|464f:bdbd:1d:535a...|2020-09-03 00:35:05|
|Designing and imp...|success|89328463488|Ontario;Canada|1589218599000|1366:c314:8d40:42...|2020-05-11 17:36:39|
|Extracting data f...|success|89328463489|California;USA|1595463446000|1ffa:cdd1:7a2b:a7...|2020-07-23 00:17:26|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
only showing top 3 rows



In [28]:
log_file.printSchema()

root
 |-- task: string (nullable = true)
 |-- status: string (nullable = true)
 |-- trackerId: string (nullable = true)
 |-- server_region: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- server: string (nullable = true)
 |-- date_time: string (nullable = true)



In [48]:
log_file = log_file.withColumn('date_time', to_timestamp(col('date_time')))  ## converting from string to timestamp

In [49]:
log_file.show(3)

+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
|                task| status|  trackerId| server_region|    timestamp|              server|          date_time|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
|Data quality assu...|success|89328463487|   Florida;USA|1599093305000|464f:bdbd:1d:535a...|2020-09-03 00:35:05|
|Designing and imp...|success|89328463488|Ontario;Canada|1589218599000|1366:c314:8d40:42...|2020-05-11 17:36:39|
|Extracting data f...|success|89328463489|California;USA|1595463446000|1ffa:cdd1:7a2b:a7...|2020-07-23 00:17:26|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+
only showing top 3 rows



In [50]:
log_file.printSchema()

root
 |-- task: string (nullable = true)
 |-- status: string (nullable = true)
 |-- trackerId: string (nullable = true)
 |-- server_region: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- server: string (nullable = true)
 |-- date_time: timestamp (nullable = true)



## split state and country into separate columns.

In [53]:
#splitting state and country

log_file = log_file.withColumn('state', split(log_file['server_region'], ';').getItem(0)) \
       .withColumn('country', split(log_file['server_region'], ';').getItem(1))

In [58]:
log_file.show(3)

+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+----------+-------+
|                task| status|  trackerId| server_region|    timestamp|              server|          date_time|     state|country|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+----------+-------+
|Data quality assu...|success|89328463487|   Florida;USA|1599093305000|464f:bdbd:1d:535a...|2020-09-03 00:35:05|   Florida|    USA|
|Designing and imp...|success|89328463488|Ontario;Canada|1589218599000|1366:c314:8d40:42...|2020-05-11 17:36:39|   Ontario| Canada|
|Extracting data f...|success|89328463489|California;USA|1595463446000|1ffa:cdd1:7a2b:a7...|2020-07-23 00:17:26|California|    USA|
+--------------------+-------+-----------+--------------+-------------+--------------------+-------------------+----------+-------+
only showing top 3 rows



In [62]:
log_file.printSchema()

root
 |-- task: string (nullable = true)
 |-- status: string (nullable = true)
 |-- trackerId: string (nullable = true)
 |-- server_region: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- server: string (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

