In [1]:
import os
import csv
import gzip
import glob
import pathlib
import argparse
import datetime

In [2]:
import logging
########## logging
# create logger with 'spam_application'
logger = logging.getLogger('notebook')
logger.setLevel(logging.DEBUG)

# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# create formatter and add it to the handlers
formatter = logging.Formatter('[%(asctime)s][%(levelname)s]: %(message)s')
ch.setFormatter(formatter)

# add the handlers to the logger
logger.addHandler(ch)
##########

In [3]:
import progressbar
progressbar.streams.wrap_stderr()

<progressbar.utils.WrappingIO at 0x7ff7bcd83470>

[2018-05-19 23:46:57,473][DEBUG]: input_files_count: 10
[2018-05-19 23:46:57,498][DEBUG]: input_file: data/test/pagecounts-20071210-070000.gz
[2018-05-19 23:46:57,503][INFO]: Processing file: data/test/pagecounts-20071210-070000.gz
[2018-05-19 23:47:00,404][INFO]: Added DataFrame for file data/test/pagecounts-20071210-070000.gz to list
[2018-05-19 23:47:00,405][DEBUG]: input_file: data/test/pagecounts-20071210-000000.gz
[2018-05-19 23:47:00,406][INFO]: Processing file: data/test/pagecounts-20071210-000000.gz
[2018-05-19 23:47:00,496][INFO]: Added DataFrame for file data/test/pagecounts-20071210-000000.gz to list
[2018-05-19 23:47:00,496][DEBUG]: input_file: data/test/pagecounts-20071210-080000.gz
[2018-05-19 23:47:00,498][INFO]: Processing file: data/test/pagecounts-20071210-080000.gz
[2018-05-19 23:47:00,562][INFO]: Added DataFrame for file data/test/pagecounts-20071210-080000.gz to list
[2018-05-19 23:47:00,567][DEBUG]: input_file: data/test/pagecounts-20071210-020000.gz
[2018-05-19 

In [4]:
import findspark
findspark.init()

In [5]:
import pyspark
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, TimestampType
from pyspark.sql import functions
from pyspark.sql.functions import lit

In [6]:
import os

# https://github.com/jupyter/docker-stacks/wiki/Docker-recipes#using-local-spark-jars

jarname = 'concatgroup_2.11-2.3.0_0.1.jar'
jarpath =  os.path.join(os.getcwd(), jarname)
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars {}'.format(jarpath)

In [7]:
conf = pyspark.SparkConf().set("spark.jars", jarpath)
sc = pyspark.SparkContext(appName="merge-pagecounts", conf=conf)
sqlctx = pyspark.SQLContext(sc)

In [8]:
schema = StructType([StructField("lang", StringType(), False),
                     StructField("page", StringType(), False),
                     StructField("views", IntegerType(), False),
                     StructField("reqbytes", IntegerType(), False)])

In [9]:
def unionAll(*dfs):
    first, *_ = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )


def date_parser(timestamp):
    return datetime.datetime.strptime(timestamp, '%Y%m%d-%H%M%S')

In [10]:
pathfile = "data/test/pagecounts-20071210-0*.gz"

In [11]:
input_files_count = len([f for f in glob.iglob(pathfile)])
input_files = glob.iglob(pathfile)

# input_files = ["data/input/sorted_time/2007-12/pagecounts-20071210-000000.gz",
#                "data/input/sorted_time/2007-12/pagecounts-20071210-010000.gz"
#                ]
# input_files_count = len(input_files)

logger.debug('input_files_count: {}'.format(input_files_count))

In [12]:
if input_files_count < 1:
    logger.warn('No input files match: exiting')
    exit(0)

In [13]:
input_files_count

10

In [14]:
list_dfs = list()
with progressbar.ProgressBar(max_value=input_files_count) as bar:
    for input_file in input_files:
        logger.debug('input_file: {}'.format(input_file))

        timestamp = date_parser(os.path.basename(input_file)
                                       .replace('pagecounts-','')
                                       .replace('.gz',''))

        logger.info('Processing file: {}'.format(input_file))
        tmp_spark_df = sqlctx.read.csv(
                            input_file,
                            header=False,
                            schema=schema,
                            sep=' ')

        tmp_spark_df = tmp_spark_df.withColumn("timestamp", lit(timestamp))
        list_dfs.append(tmp_spark_df)
        del tmp_spark_df

        logger.info('Added DataFrame for file {} to list'.format(input_file))

In [15]:
assert len(list_dfs) >= 1, 'There should be at least one DataFrame'

In [16]:
if len(list_dfs) > 1:
    logger.info('Union of all Spark DataFrames.')
    df = unionAll(*list_dfs)
    logger.info('Spark DataFrame created')
else:
    df = list_dfs[0]

In [17]:
df.count()

122

In [18]:
logger.info('Dropping column "reqbytes" from DataFrame')
df = df.drop('reqbytes')
logger.info('Dropped column "reqbytes" from DataFrame')

In [19]:
df.dtypes

[('lang', 'string'),
 ('page', 'string'),
 ('views', 'int'),
 ('timestamp', 'timestamp')]

In [20]:
# from pyspark.sql.types import BooleanType
# sqlctx.udf.registerJavaFunction("group_concat", "org.wikipedia.spark.udf.GroupConcat", BooleanType())

In [113]:
new_schema = StructType([StructField("lang", StringType(), False),
                         StructField("page", StringType(), False),
                         StructField("day", StringType(), False),
                         StructField("enc", StringType(), False)])

In [120]:
# @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
# def g(df):
#     result = pd.DataFrame(df.groupby(df.key).apply(
#         lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
#     ))
#     result.reset_index(inplace=True, drop=False)
#     return result

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O',
                  'P','Q','R','S','T','U','V','W','X']

@pandas_udf(new_schema, PandasUDFType.GROUPED_MAP)
def concat_hours(x):
    view_hours = x['hour'].tolist()
    view_views = x['views'].tolist()

    view_hours_letters = [hour_to_letter[h] for h in view_hours]

    encoded_views = [l + str(h)
                     for l, h in sorted(zip(view_hours_letters,view_views))]
    encoded_views_string = ''.join(encoded_views)

    # return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day, 'enc': encoded_views_string}, index=[x.index[0]])
    return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day, 'page': encoded_views_string}, index=[x.index[0]])

In [121]:
# grouped_df = (df.select(['lang',
#                          'page',
#                          functions.date_format('timestamp','yyyy-MM-dd').alias('day'), 
#                          functions.hour('timestamp').alias('hour'), 
#                          'views'
#                          ])
#                 .groupby(['lang','page','day'])
#                 .apply(concat_hours)
#                 .dropDuplicates()
#                 )

In [122]:
from pyspark.sql import functions
grouped_df = (df.select(['lang',
                         'page',
                         functions.date_format('timestamp','yyyy-MM-dd').alias('day'), 
                         functions.hour('timestamp').alias('hour'), 
                         'views'
                         ])
                .groupby(['lang','page','day'])
                )

In [123]:
grouped_df = (grouped_df.apply(concat_hours)
                        .dropDuplicates()
                        )

In [124]:
grouped_df.show()

+----+--------------------+----------+--------------------+
|lang|                page|       day|                 enc|
+----+--------------------+----------+--------------------+
|  en|        Albert_Cahen|2007-12-10|                  J1|
|  en|     Albert_Campbell|2007-12-10|            A1D1F1I1|
|  en|Albert_C._L._G._G...|2007-12-10|                  J1|
|  en|Albert_Cardinal_V...|2007-12-10|                  E1|
|  en|        Albert_Canal|2007-12-10|            A2C1G2I1|
|  en|Albert_Camus#Oppo...|2007-12-10|                  E1|
|  en|       Albert_Brooks|2007-12-10|                 G16|
|  en|      Albert_Cashier|2007-12-10|          A2B1D7E1H2|
|  en|     Albert_Castillo|2007-12-10|              B1E1H1|
|  en|    Albert_C._Outler|2007-12-10|                  H2|
|  en|   Albert_Carotenuto|2007-12-10|                D1H1|
|  en|     Albert_C._Field|2007-12-10|                  G1|
|  en|Albert_Camus#Summ...|2007-12-10|                  D1|
|  en|       Albert_Cavens|2007-12-10|  

In [145]:
from pyspark.sql.functions import col
result = df.select([col('lang').alias('result_lang'),
                    col('page').alias('result_page'),
                    functions.date_format('timestamp','yyyy-MM-dd').alias('result_day')
                    ])

In [146]:
result.show()

+-----------+--------------------+----------+
|result_lang|         result_page|result_day|
+-----------+--------------------+----------+
|         en|Albert_Bushnell_Hart|2007-12-10|
|         en|    Albert_C._Outler|2007-12-10|
|         en|     Albert_Calmette|2007-12-10|
|         en|Albert_Campbell_S...|2007-12-10|
|         en|      Albert_Campion|2007-12-10|
|         en|        Albert_Camus|2007-12-10|
|         en|Albert_Camus/the_...|2007-12-10|
|         en|      Albert_Cardozo|2007-12-10|
|         en|    Albert_Carnesale|2007-12-10|
|         en|   Albert_Carotenuto|2007-12-10|
|         en|      Albert_Cashier|2007-12-10|
|         en|     Albert_Castillo|2007-12-10|
|         en|      Albert_Bunjaku|2007-12-10|
|         en|Albert_C._L._G._G...|2007-12-10|
|         en|    Albert_C._Vaughn|2007-12-10|
|         en|     Albert_Campbell|2007-12-10|
|         en|Albert_Campbell_C...|2007-12-10|
|         en|        Albert_Camus|2007-12-10|
|         en|        Albert_Canal|

In [148]:
cond = [result.result_lang == grouped_df.lang, result.result_page == grouped_df.page, result.result_day == grouped_df.day]
final = result.join(grouped_df, cond).select(['result_lang','result_page','result_day','enc'])

In [150]:
final.dropDuplicates().show()

+-----------+--------------------+----------+--------------------+
|result_lang|         result_page|result_day|                 enc|
+-----------+--------------------+----------+--------------------+
|         en|Albert_Camus#Furt...|2007-12-10|                  E1|
|         en|Albert_Camuscolum...|2007-12-10|                  J1|
|         en|    Albert_Carnesale|2007-12-10|                G4H5|
|         en|      Albert_Bunjaku|2007-12-10|                  A1|
|         en|        Albert_Camus|2007-12-10|A150B148C197D173E...|
|         en|      Albert_Cadwell|2007-12-10|                  D1|
|         en|     Albert_C._Field|2007-12-10|                  G1|
|         en|Albert_Camus#Oppo...|2007-12-10|                  E1|
|         en|       Albert_Caquot|2007-12-10|            C1E1G1I1|
|         en|Albert_CamusNon-f...|2007-12-10|                  I1|
|         en|      Albert_Celades|2007-12-10|              A3B1J2|
|         en|      Albert_Cashier|2007-12-10|          A2B1D7E