In [2]:
# imports
import sys
from pyspark.sql import SparkSession, functions, types
import ntpath
import re

In [3]:
# boilerplate 
spark = SparkSession.builder.appName('reddit averages').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3, 5) # make sure we have Python 3.5+
assert spark.version >= '2.3' # make sure we have Spark 2.3+

In [4]:
# define schema
pages_schema = types.StructType([
    types.StructField('page', types.StringType()),
    types.StructField('title', types.StringType()),
    types.StructField('requests', types.IntegerType()),
    types.StructField('bytes', types.IntegerType())
])

In [5]:
df = spark.read.csv('pagecounts-1/', sep=' ', schema=pages_schema).\
    withColumn('filename', functions.input_file_name()).cache()
df = df.filter((df.page == 'en') & (df.title != 'Main_Page') & \
              ~df.title.startswith('Special:'))
df.show()

+----+--------------------+--------+-------+--------------------+
|page|               title|requests|  bytes|            filename|
+----+--------------------+--------+-------+--------------------+
|  en|Simon_%22Ghost%22...|       2|  39816|file:///Users/cha...|
|  en| Simon_%26_Garfunkel|      91|4543932|file:///Users/cha...|
|  en|Simon_%26_Garfunk...|      19| 748974|file:///Users/cha...|
|  en|  Simon_%26_Schuster|      18| 883010|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|   6651|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|  28288|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|  28333|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|  28338|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|  28316|file:///Users/cha...|
|  en|Simon_%26_Schuste...|       1|  28317|file:///Users/cha...|
|  en|     Simon_%26_Simon|      11| 268170|file:///Users/cha...|
|  en|    Simon_%28game%29|       1|  22180|file:///Users/cha...|
|  en|   S

In [6]:
def get_date(path):
    base = ntpath.basename(path)
    try:
        res = re.search('-(.+?-\d{1,2})[0]', base).group(1)
    except AttributeError:
        res = ''
    return res

In [7]:
path_to_hour = functions.udf(get_date, returnType=types.StringType())

In [8]:
df = df.withColumn('hour', path_to_hour('filename'))

In [9]:
MaxHours = df.groupBy('hour').max('requests').withColumnRenamed('max(requests)', 'max')
MaxHours = MaxHours.withColumnRenamed('hour', 'h')

In [10]:
df = df.join(MaxHours, (df.hour == MaxHours.h) & (df.requests == MaxHours.max))

In [11]:
df = df.drop('h', 'max').sort('hour', 'title')
df = df.select('hour', 'title', 'requests')
df.select('hour').distinct().sort('hour').show()

+-----------+
|       hour|
+-----------+
|20160801-00|
|20160801-01|
|20160801-02|
|20160801-03|
|20160801-04|
|20160801-05|
|20160801-06|
|20160801-07|
|20160801-08|
|20160801-09|
|20160801-10|
|20160801-11|
|20160801-12|
|20160801-13|
|20160801-14|
|20160801-15|
|20160801-16|
|20160801-17|
|20160801-18|
|20160801-19|
+-----------+
only showing top 20 rows



In [24]:
df1 = df.select('hour', 'requests')
df1 = df.groupby('hour').agg({'requests': 'count'})
df1.explain()

== Physical Plan ==
*(9) HashAggregate(keys=[hour#85], functions=[count(requests#2)])
+- Exchange hashpartitioning(hour#85, 200)
   +- *(8) HashAggregate(keys=[hour#85], functions=[partial_count(requests#2)])
      +- *(8) Project [hour#85, requests#2]
         +- *(8) Sort [hour#85 ASC NULLS FIRST, title#1 ASC NULLS FIRST], true, 0
            +- Exchange rangepartitioning(hour#85 ASC NULLS FIRST, title#1 ASC NULLS FIRST, 200)
               +- *(7) Project [title#1, requests#2, hour#85]
                  +- *(7) BroadcastHashJoin [hour#85, requests#2], [h#105, max#102], Inner, BuildRight
                     :- *(7) Project [title#1, requests#2, pythonUDF0#795 AS hour#85]
                     :  +- BatchEvalPython [get_date(filename#8)], [filename#8, requests#2, title#1, pythonUDF0#795]
                     :     +- *(2) Project [filename#8, requests#2, title#1]
                     :        +- *(2) Project [page#0, title#1, requests#2, bytes#3, filename#8]
                     :    