In [11]:
import shutil
import pandas as pd
import pandas_profiling

In [2]:
# Concatenating all the "Performance" files into one .txt file
filenames_time = ['historical_data1_time_Q12015.txt', 'historical_data1_time_Q22015.txt', 'historical_data1_time_Q32015.txt', 'historical_data1_time_Q42015.txt',
            'historical_data1_time_Q12016.txt', 'historical_data1_time_Q22016.txt', 'historical_data1_time_Q32016.txt', 'historical_data1_time_Q42016.txt',
            'historical_data1_time_Q12017.txt']
with open('output_time_file.txt','wb') as wfd:
    for f in filenames_time:
        with open(f,'rb') as fd:
            shutil.copyfileobj(fd, wfd, 1024*1024*10)

In [3]:
from pyspark import SparkContext
# from pyspark.sql import SQLContext
# from pyspark.sql.types import *

# Load relevant objects
# sc = SparkContext('local')

#### Next we load in the concatenated text file into a PySpark dataframe, which looks like the following

In [4]:
sc.setLogLevel("WARN")
log_txt=sc.textFile("output_time_file.txt")
temp_var = log_txt.map(lambda k: k.split("|"))
header = log_txt.first()
log_df=temp_var.toDF()
log_df.show()
# .split("|")

+------------+------+---------+---+---+---+---+---+---+------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|          _1|    _2|       _3| _4| _5| _6| _7| _8| _9|   _10|  _11|_12|_13|_14|_15|_16|_17|_18|_19|_20|_21|_22|_23|_24|
+------------+------+---------+---+---+---+---+---+---+------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|F115Q1000001|201504|   320000|  0|  0|180|   |   |   |      |2.625|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000001|201505|   319000|  0|  1|179|   |   |   |      |2.625|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000001|201506|   317000|  0|  2|178|   |   |   |      |2.625|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000001|201507|   316000|  0|  3|177|   |   |   |      |2.625|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000001|201508|   314000|  0|  4|176|   |   |   |      |2.625|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000001|201509|   313000| 

#### Now filtering the rows of dataframe log_df by cases when values in column "3" equal 0. New dataframe is called new_df

In [6]:
from pyspark.sql.functions import col
new_df = log_df.where((col("_3") == "0"))
new_df.show()

+------------+------+---+---+---+---+---+---+---+------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|          _1|    _2| _3| _4| _5| _6| _7| _8| _9|   _10|  _11|_12|_13|_14|_15|_16|_17|_18|_19|_20|_21|_22|_23|_24|
+------------+------+---+---+---+---+---+---+---+------+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|F115Q1000001|201610|  0|  0| 18|162|  N|   | 01|201610|2.625|  0|   |   |   |   |   |   |   |   |   |   |  0|   |
|F115Q1000010|201702|  0|  0| 24|336|  N|   | 01|201702|4.625|  0|   |   |   |   |   |   |   |   |   |   |  0|   |
|F115Q1000011|201701|  0|  0| 21|159|  N|   | 01|201701|  3.5|  0|   |   |   |   |   |   |   |   |   |   |  0|   |
|F115Q1000012|201608|  0|  0| 17|343|  N|   | 01|201608|3.875|  0|   |   |   |   |   |   |   |   |   |   |  0|   |
|F115Q1000017|201705|  0|  0| 26|334|  N|   | 01|201705|3.875|  0|   |   |   |   |   |   |   |   |   |   |  0|   |
|F115Q1000020|201609|  0|  0| 17|343|  N|   | 01|201609|3.875|  0|   |   |   |  

#### Selecting only the first few columns, because the others are not required for analysis and mostly contain null values

In [7]:
pdf = new_df.select(["_1","_2","_3","_4","_5","_6"])
pdf.show()

+------------+------+---+---+---+---+
|          _1|    _2| _3| _4| _5| _6|
+------------+------+---+---+---+---+
|F115Q1000001|201610|  0|  0| 18|162|
|F115Q1000010|201702|  0|  0| 24|336|
|F115Q1000011|201701|  0|  0| 21|159|
|F115Q1000012|201608|  0|  0| 17|343|
|F115Q1000017|201705|  0|  0| 26|334|
|F115Q1000020|201609|  0|  0| 17|343|
|F115Q1000021|201701|  0|  0| 23|337|
|F115Q1000024|201705|  0|  0| 26|333|
|F115Q1000028|201607|  0|  0| 16|344|
|F115Q1000041|201505|  0|  0|  3|357|
|F115Q1000043|201608|  0|  0| 18|222|
|F115Q1000045|201703|  0|  0| 25|335|
|F115Q1000053|201512|  0|  0| 10|350|
|F115Q1000059|201505|  0|  0|  3|177|
|F115Q1000062|201701|  0|  0| 22|158|
|F115Q1000069|201609|  0|  0| 19|341|
|F115Q1000072|201609|  0|  0| 19|341|
|F115Q1000080|201705|  0|  0| 27|153|
|F115Q1000081|201702|  0|  0| 24|336|
|F115Q1000084|201608|  0|  0| 18|342|
+------------+------+---+---+---+---+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import countDistinct
pdf.agg(countDistinct("_1")).show()

+------------------+
|count(DISTINCT _1)|
+------------------+
|            340128|
+------------------+



In [39]:
from pyspark.sql.functions import countDistinct
log_df.agg(countDistinct("_1")).show()

+------------------+
|count(DISTINCT _1)|
+------------------+
|           3115090|
+------------------+



In [16]:
adf = log_df.where((col("_1") == "F115Q1000002"))
adf.show(40)

+------------+------+---------+---+---+---+---+---+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|          _1|    _2|       _3| _4| _5| _6| _7| _8| _9|_10|  _11|_12|_13|_14|_15|_16|_17|_18|_19|_20|_21|_22|_23|_24|
+------------+------+---------+---+---+---+---+---+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+
|F115Q1000002|201503|   110000|  0|  0|360|   |   |   |   |3.875|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000002|201504|   110000|  0|  1|359|   |   |   |   |3.875|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000002|201505|   109000|  0|  2|358|   |   |   |   |3.875|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000002|201506|   109000|  0|  3|357|   |   |   |   |3.875|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000002|201507|   109000|  0|  4|356|   |   |   |   |3.875|  0|   |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000002|201508|   109000|  0|  5|355|   |   |   | 

In [21]:
bdf = log_df.where((col("_4") == "3"))
bdf.show()

+------------+------+---------+---+---+---+---+---+---+---+-----+---+------+---+---+---+---+---+---+---+---+---+---+---+
|          _1|    _2|       _3| _4| _5| _6| _7| _8| _9|_10|  _11|_12|   _13|_14|_15|_16|_17|_18|_19|_20|_21|_22|_23|_24|
+------------+------+---------+---+---+---+---+---+---+---+-----+---+------+---+---+---+---+---+---+---+---+---+---+---+
|F115Q1000155|201611|104365.22|  3| 21|339|   |   |   |   |4.125|  0|      |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000341|201605|380555.33|  3| 15|345|   |   |   |   |4.125|  0|      |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000373|201609| 49565.82|  3| 19|161|   |   |   |   |  3.5|  0|      |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000373|201610|  49331.5|  3| 20|160|   |   |   |   |  3.5|  0|      |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1000954|201612| 83068.58|  3| 22|158|   |   |   |   | 3.75|  0|      |   |   |   |   |   |   |   |   |   |   |   |
|F115Q1001629|201611| 32925.95| 

In [18]:
bdf.count()

2996

In [23]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col 

cnts = log_df.groupBy("_1").agg(count("*").alias("cnt")).alias("cnts")

w = Window().partitionBy("_4").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("_1", "_2", "_3", "_4", "_5", "_6"))

AnalysisException: "cannot resolve '`_4`' given input columns: [cnts._1, cnts.cnt];;\n'Project [_1#0, cnt#769L, row_number() windowspecdefinition('_4, cnt#769L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#774]\n+- AnalysisBarrier\n      +- SubqueryAlias cnts\n         +- Aggregate [_1#0], [_1#0, count(1) AS cnt#769L]\n            +- LogicalRDD [_1#0, _2#1, _3#2, _4#3, _5#4, _6#5, _7#6, _8#7, _9#8, _10#9, _11#10, _12#11, _13#12, _14#13, _15#14, _16#15, _17#16, _18#17, _19#18, _20#19, _21#20, _22#21, _23#22, _24#23], false\n"

In [25]:
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 
from pyspark.sql.functions import struct

cnts = log_df.groupBy("_1").agg(count("*").alias("cnt")).alias("cnts")

(cnts
  .groupBy("_1")
  .agg(F.max(struct(col("cnt"), col("_4"))).alias("max"))
  .select(col("_1"), col("max._4")))

AnalysisException: "cannot resolve '`_4`' given input columns: [cnts._1, cnts.cnt];;\n'Aggregate [_1#0], [_1#0, max(named_struct(cnt, cnt#801L, NamePlaceholder, '_4)) AS max#808]\n+- AnalysisBarrier\n      +- SubqueryAlias cnts\n         +- Aggregate [_1#0], [_1#0, count(1) AS cnt#801L]\n            +- LogicalRDD [_1#0, _2#1, _3#2, _4#3, _5#4, _6#5, _7#6, _8#7, _9#8, _10#9, _11#10, _12#11, _13#12, _14#13, _15#14, _16#15, _17#16, _18#17, _19#18, _20#19, _21#20, _22#21, _23#22, _24#23], false\n"

In [27]:
import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window

# val w = Window.partitionBy($"_1").orderBy($"_4".desc)

# val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

# dfTop.show

SyntaxError: invalid syntax (<ipython-input-27-31345c3e7047>, line 1)

In [36]:
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window
from pyspark.sql.functions import struct
dfa = log_df.select($"_1", struct($"_2", $"_3", $"_4", $"_5").alias("vs"))
  .groupBy($"hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs._2", $"vs._3", $"vs._4", $"vs._5")

dfa.show()

SyntaxError: invalid syntax (<ipython-input-36-b3dda5957ef1>, line 4)