In [2]:
import os 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_utc_timestamp,hour,count,isnull

In [3]:
path = '/home/amogh/Documents/vs_code_python/spark_itversity/spark_certification_notes/01-spark-etl-databricks/us_sec_gov_log_data/log20170630.csv'

In [4]:
spark = SparkSession.builder.appName('spark_etl_databricks').getOrCreate()

In [5]:
# using the sample 
logDF = spark.\
        read.\
        option('header','true').\
        csv(path).sample(withReplacement = False, fraction=0.3,seed=3)

In [29]:
display(logDF)

DataFrame[ip: string, date: string, time: string, zone: string, cik: string, accession: string, extention: string, code: string, size: string, idx: string, norefer: string, noagent: string, find: string, crawler: string, browser: string]

In [33]:
logDF.take(3)

[Row(ip='101.81.133.jja', date='2017-06-30', time='00:00:00', zone='0.0', cik='1608552.0', accession='0001047469-17-004337', extention='-index.htm', code='200.0', size='80251.0', idx='1.0', norefer='0.0', noagent='0.0', find='9.0', crawler='0.0', browser=None),
 Row(ip='107.23.85.jfd', date='2017-06-30', time='00:00:00', zone='0.0', cik='1136894.0', accession='0000905148-07-003827', extention='-index.htm', code='200.0', size='3021.0', idx='1.0', norefer='0.0', noagent='0.0', find='10.0', crawler='0.0', browser=None),
 Row(ip='107.23.85.jfd', date='2017-06-30', time='00:00:00', zone='0.0', cik='841535.0', accession='0000841535-10-000003', extention='-index.htm', code='200.0', size='2716.0', idx='1.0', norefer='0.0', noagent='0.0', find='10.0', crawler='0.0', browser=None)]

In [34]:
logDF.select('code').distinct().show()

+-----+
| code|
+-----+
|200.0|
|503.0|
|404.0|
| null|
|  0.0|
|400.0|
|206.0|
|429.0|
|500.0|
|403.0|
|502.0|
|301.0|
|504.0|
|304.0|
+-----+



In [35]:
# Lets look at server side error code between - 500 & 600

In [43]:
servererror_code_df = logDF.\
    filter((col("code") >= 500) & (col("code") < 600))\
        .select("date","time","extention","code") 

In [44]:
display(servererror_code_df)

DataFrame[date: string, time: string, extention: string, code: string]

In [45]:
servererror_code_df.take(1)

[Row(date='2017-06-30', time='00:00:11', extention='-index.htm', code='503.0')]

In [52]:
# Data validation
# one aspect of ETL job is to validate that the data is what you expect. That includes 
#   * The number of records
#   * The expected fields present
#   * No unexpected missing values 

In [None]:
# Look at serverside error by hour 

In [62]:
servererror_perhour_df = servererror_code_df.select(hour(from_utc_timestamp('time','GMT')).alias('hour') ).\
                         groupBy("hour").\
                         count().orderBy("hour")

In [63]:
display(servererror_perhour_df)

DataFrame[hour: int, count: bigint]

In [69]:
servererror_code_df.repartition(1).\
    write.\
    mode("overwrite").\
        parquet("/home/amogh/Documents/vs_code_python/spark_itversity/spark_certification_notes/01-spark-etl-databricks/us_sec_gov_log_data/serverside_error_log20170630.parquet")

In [72]:
#saving the file in parquet format (columnar formar )

In [65]:
servererror_perhour_df.show(24)

+----+-----+
|hour|count|
+----+-----+
|   0| 1956|
|   1| 1920|
|   2| 1813|
|   3| 1962|
|   4| 1957|
|   5| 2060|
|   6| 1957|
|   7| 1936|
|   8| 1996|
|   9| 1966|
|  10| 1931|
|  11| 1973|
|  12| 1733|
|  13| 1904|
|  14| 2055|
|  15| 1960|
|  16| 1977|
|  17| 1976|
|  18| 1943|
|  19| 2003|
|  20| 2008|
|  21| 1992|
|  22| 1792|
|  23|   22|
+----+-----+



In [70]:
# # count of ip address

In [6]:
ipcount = logDF.select('ip').\
    groupBy("ip").count().orderBy(col('count').desc())

In [88]:
ipcount.show()

+---------------+------+
|             ip| count|
+---------------+------+
| 109.145.75.gge|226596|
|165.124.130.jhd|213192|
| 38.105.116.iei|193443|
|165.124.130.igj|190611|
|  52.23.159.dgd|190483|
| 54.162.220.aah|133406|
| 117.91.230.gha|124162|
|  108.91.91.hbc|117873|
|180.119.118.baj|112997|
|  96.127.52.gig|111572|
|165.124.130.jdj|106432|
|  23.20.108.ihh| 91046|
|  149.56.12.jbf| 86488|
|  13.90.101.dfh| 80983|
|  65.254.10.fdf| 80557|
|192.223.241.fcg| 78053|
|  209.249.4.gjc| 74681|
|180.119.116.cjb| 74313|
|180.119.118.fib| 72516|
|   78.137.2.bcj| 61102|
+---------------+------+
only showing top 20 rows



In [7]:
ipcount.write.mode('overwrite').parquet('/tmp/ipcount.parquet')

In [89]:
ip1, count1 = ipcount.first()

In [108]:
def dftest(ip,ipcount:int,ipvalue,ipcountvalue:int):

    if (ip == ipvalue) and (ipcount ==  ipcountvalue):
        print('pass')
    else:
        print('fail')

In [109]:
dftest('109.145.75.gge',226596,'109.145.75.gge',226596)

pass
