In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
from pyspark.sql.functions import sum
import datetime

In [2]:
# in this notebook we will process requests in the period of June 2018
startDate = '20180601'
endDate = '20180630'

In [3]:
# define format for string representing a date and time
format = '%Y%m%d'

In [4]:
# build spark session
sess = SparkSession.builder.master("local[*]").appName("mmds2").getOrCreate()

In [5]:
# read csv with Wiki interlanguage link records (file that we received after converting from langlinks.sql.gz)
df_lang = sess.read.format("csv").option("header", "true").load("./data/langlinks/ukwiki-20180701-langlinks.csv")

In [6]:
# let's look - just for check
test = df_lang.filter(df_lang.id == 1273710)

In [7]:
test.show()

+-------+----+-----------------+
|     id|lang|            title|
+-------+----+-----------------+
|1273710|  de|PHP-Beschleuniger|
|1273710|  en|  PHP accelerator|
|1273710|  es|  PHP accelerator|
|1273710|  lv| PHP paātrinātājs|
|1273710|  ru|  Акселератор PHP|
|1273710|  zh|          PHP加速軟件|
+-------+----+-----------------+



In [8]:
# filter to English Wiki
dl = df_lang.filter(df_lang.lang == "en")

In [9]:
# take a look at the schema
dl.printSchema()

root
 |-- id: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- title: string (nullable = true)



In [10]:
# take a look at the data
dl.show()

+-------+----+-----+
|     id|lang|title|
+-------+----+-----+
|      3|  en| null|
|  93215|  en| null|
| 109484|  en| null|
| 122051|  en| null|
| 130506|  en| null|
| 609236|  en| null|
| 621969|  en| null|
| 772924|  en| null|
| 907757|  en| null|
| 942309|  en| null|
|1336102|  en| null|
|1627304|  en| null|
|1631181|  en| null|
|1979525|  en| null|
|1979527|  en| null|
|1979528|  en| null|
|1982832|  en| null|
|1983737|  en| null|
|2002192|  en| null|
|2052178|  en| null|
+-------+----+-----+
only showing top 20 rows



In [11]:
# getting  Base per-page data (file that we received after converting from page.sql.gz)
df_page = sess.read.format("csv").option("header", "true").load("./data/page/ukwiki-20180701-page.csv")

In [12]:
# take a look at the schema
df_page.printSchema()

root
 |-- page_id: string (nullable = true)
 |-- page_namespace: string (nullable = true)
 |-- title: string (nullable = true)
 |-- page_restrictions: string (nullable = true)
 |-- page_counter: string (nullable = true)
 |-- page_is_redirect: string (nullable = true)
 |-- page_is_new: string (nullable = true)
 |-- page_random: string (nullable = true)
 |-- page_touched: string (nullable = true)
 |-- page_links_updated: string (nullable = true)
 |-- page_latest: string (nullable = true)
 |-- page_len: string (nullable = true)
 |-- page_no_title_convert: string (nullable = true)
 |-- page_content_model: string (nullable = true)
 |-- page_lang: string (nullable = true)



In [13]:
# create a datetime objects from a string representing a startdate and enddate of the period
start = datetime.datetime.strptime(startDate, format)
end = datetime.datetime.strptime(endDate, format)

In [14]:
# define duration/step, the difference between two dates for next reading daily pageviews files
step = datetime.timedelta(days=1)

In [15]:
# iterate over days in a period with step 1 day and read daily pageviews statistics and collect to parquet
while start <= end:

    date_Str = start.strftime(format)

    print("Date: " + date_Str)

    # read all daily pageviews (getting hourly "pageviews" for this date -  from 00 to 23)
    rawData = sess.read.option("delimiter", " ").option("inferSchema", "true").csv("./data/pageviews/pageviews-" + date_Str + "*")

    # define columns of interest
    cols = ["project", "title", "requests", "hour_vise"]

    # conversion from Spark RDD to Dataframe
    rawDF = rawData.toDF(*cols)

    # filter - only ukrainian project
    rawDF_uk = rawDF.filter(rawDF.project == "uk")
    
    # left join "pageviews" with Base per-page data on 'title' because we need $"page_id"
    rawDF_uk_with_id = rawDF_uk.join(df_page, "title")
    #rawDF_uk_with_id.printSchema()
    #rawDF_uk_with_id.show()

    # leftanti with 'langlinks' (exclude corresponding row) - because we need just article missing in the enwiki
    rawDF_uk_notEn = rawDF_uk_with_id.join(dl, rawDF_uk_with_id.page_id == dl.id, 'left_anti')

    # group by "page_id" - getting daily "pageviews" for this specific page
    ds = rawDF_uk_notEn.groupBy("page_id").agg(sum("requests").alias("sum_requests"))

    #ds.printSchema()
    ds.show()

    # add column with date
    enDS = ds.select("page_id", "sum_requests").withColumn("date", sf.lit(date_Str))

    # append daily pageviews to parquet
    enDS.write.format("parquet").mode('append').save("./pageviews.parquet")

    start += step

Date: 20180601
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1006370|          13|
|1023078|           1|
|1037752|           2|
|1040262|           1|
|1043412|           4|
|1046207|           2|
|1048314|           1|
|1053155|           7|
|1055177|           1|
| 105705|           1|
| 105758|           1|
| 106022|           3|
|1065129|           2|
|1065294|          10|
|1065828|           1|
|1066479|           3|
|1076191|           1|
|1077590|           2|
|1081405|          10|
|1082047|           3|
+-------+------------+
only showing top 20 rows

Date: 20180602
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1006370|          15|
|1021617|           1|
|1023954|           1|
|1037752|           2|
|1038733|           1|
|1040262|           2|
|1048314|           1|
|1053155|           1|
| 105758|           1|
| 106022|           4|
|1065129|           1|
|1065294|           1|
|1065828|           1|
|1077590|           1|
|

Date: 20180615
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1003644|           1|
|1006370|          15|
|1012551|           1|
|1015827|           1|
|1023531|           1|
|1043412|           1|
|1046207|           3|
|1050825|           2|
|1053155|           1|
|1057593|           1|
| 106022|           1|
| 106090|           1|
|1065129|           2|
|1065294|           3|
|1066479|           5|
|1080693|           1|
|1081405|           4|
|1082047|           4|
|1090656|           1|
|1091164|           1|
+-------+------------+
only showing top 20 rows

Date: 20180616
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1003644|           1|
|1006370|          16|
|1021617|           1|
|1023078|           1|
|1023531|           1|
|1028644|           1|
|1037752|           2|
|1043412|           1|
|1046207|           2|
| 104870|           1|
| 105705|           1|
|1057378|           1|
| 105758|           1|
| 106022|           3|
|

Date: 20180629
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1006370|           5|
|1023531|           1|
|1037101|           1|
|1040262|           1|
|1048314|           1|
|1053155|           3|
|1055177|           1|
| 105705|           1|
|1057378|           1|
| 105758|           1|
|1057593|           1|
| 106022|           1|
|1065129|           1|
|1065294|           1|
|1065682|           1|
|1065828|           2|
|1066069|           1|
|1066479|           3|
|1077590|           1|
|1081405|           1|
+-------+------------+
only showing top 20 rows

Date: 20180630
+-------+------------+
|page_id|sum_requests|
+-------+------------+
|1006370|           8|
|1012551|           1|
|1023531|           1|
|1028644|           1|
|1028909|           1|
|1033524|           1|
|1038555|           1|
|1040262|           1|
|1043412|           2|
|1046207|           1|
|1053155|           1|
| 105758|           1|
| 106022|           1|
|1065129|           2|
|