In [1]:
import time
from pyspark.sql import SQLContext, Row, HiveContext
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
import pandas as pd

24 heures

In [2]:
hc = HiveContext(sc)

In [3]:
wikipediadata = hc.read.format("org.apache.spark.sql.cassandra").\
               load(keyspace="projet", table="wikipediadata")

In [5]:
day = 60

In [104]:
t0 = time.time()

In [6]:
# On veut les 24 dernieres heures de day
sums = wikipediadata.where(wikipediadata.day == day)
# Sous-ensemble de test
sums = sums.where((sums.page == 'Cadillac_Brougham') | ((sums.page == 'Roald_Dahl') & (sums.projectcode == 'fr')))

# On cache pour plus tard
sums.cache()

DataFrame[projectcode: string, page: string, day: int, hour: int, views: bigint]

In [7]:
# on définit une windows := heure précédente
window_spec = \
    Window \
        .partitionBy(sums.projectcode, sums.page) \
        .orderBy(sums.hour.asc()) \
        .rowsBetween(-1, -1)

In [8]:
# on calcule la différence entre views(h) - views(h-1)
diffs = sums.withColumn('diff', sums.views - F.sum(sums.views).over(window_spec))

In [10]:
# on calcule les coefs à appliquer à chaque jour
coefs = pd.DataFrame({'hour': range(24)})
coefs['coef'] = 1. / (24. - coefs.hour)

coefs = hc.createDataFrame(coefs)
diffs = diffs.join(coefs, 'hour')

In [11]:
# on calcul le score de chaque jour
diffs = diffs.withColumn('sub_score', diffs.diff * diffs.coef)

In [81]:
totals = diffs.groupby('projectcode', 'page').sum('views', 'sub_score')
# on normalise par la racine de la somme des views 
totals = totals.withColumn('score', totals['SUM(sub_score)'] / F.sqrt(totals['SUM(views)'])) \
        .orderBy(F.desc('score')) \
        .withColumnRenamed('SUM(views)', 'total_views') \
        .limit(10)

In [83]:
views = sums.select('projectcode', 'page', 'hour', 'views') \
        .join(totals.select('projectcode', 'page', 'total_views', 'score'), 
              (totals.projectcode == sums.projectcode) & (totals.page == sums.page), 'right_outer')

In [85]:
df = totals.select('projectcode', 'page', 'total_views', 'score').toPandas()

In [86]:
df2 = views.toPandas()

In [87]:
df2 = df2.iloc[:, 2:]

In [93]:
df2 = df2.pivot_table(values='views', columns=['hour'], index=['projectcode', 'page'], fill_value=0)

In [96]:
df = df.merge(df2, left_on=['projectcode', 'page'], right_index=True)

In [103]:
df.to_csv('day_{}_24htrending.csv'.format(day), index=False)

In [None]:
# on calcule la différence entre views jour courant - views jour précédent
t1 = time.time()

In [None]:
d = t1-t0
m = int(d // 60)
s = int(d % 60)
with open('/home/ubuntu/projetnosql/log_time', 'a') as f:
    f.write('day {} written in {} min and {} sec'.format(day, m, s))