In [3]:
from pyspark.sql.types import *

df = spark.read.parquet("/mnt/decile_df.parquet")
df.show(5)
df.printSchema()

+--------------------+--------------------+--------------+
|                 PEL|            features|   probability|
+--------------------+--------------------+--------------+
|loA7jZOEIdPQuovau...|[-0.0941031108144...|0.472279920779|
|9RUjG7kHNfyuf+ccL...|[-0.2426578816902...|0.547101393255|
|9RUjG7kHNfyuf+ccL...|[-0.1134347918012...| 0.41549351372|
|9RUjG7kHNfyuf+ccL...|[1.6445755782986,...|0.377368624576|
|loA7jZOEIdPQuovau...|[0.06168342750406...|0.417438637431|
+--------------------+--------------------+--------------+
only showing top 5 rows

root
 |-- PEL: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- probability: string (nullable = true)



In [10]:
df = df.withColumn("probability", df["probability"].cast(FloatType()))

deciles = df.approxQuantile("probability", [n/10.0 for n in range(11)], 0)

buckets = [(deciles[i],deciles[i+1]) for i in range(len(deciles)-1)]

print buckets

[(0.34745725989341736, 0.4102127254009247), (0.4102127254009247, 0.42469552159309387), (0.42469552159309387, 0.435580313205719), (0.435580313205719, 0.44801414012908936), (0.44801414012908936, 0.46979403495788574), (0.46979403495788574, 0.5170090794563293), (0.5170090794563293, 0.5499310493469238), (0.5499310493469238, 0.5757752060890198), (0.5757752060890198, 0.6035268306732178), (0.6035268306732178, 0.7928127646446228)]


In [11]:
from pyspark.sql.functions import udf

def bucket(n):
    for bucket_ in buckets:
        if n >= bucket_[0] and n <= bucket_[1]:
            return buckets.index(bucket_)

bucket = udf(bucket)

df_decile = df.withColumn("decile", bucket(df.probability))
df_decile.show(5)

+--------------------+--------------------+-----------+------+
|                 PEL|            features|probability|decile|
+--------------------+--------------------+-----------+------+
|loA7jZOEIdPQuovau...|[-0.0941031108144...|  0.4722799|     5|
|9RUjG7kHNfyuf+ccL...|[-0.2426578816902...|  0.5471014|     6|
|9RUjG7kHNfyuf+ccL...|[-0.1134347918012...| 0.41549352|     1|
|9RUjG7kHNfyuf+ccL...|[1.6445755782986,...| 0.37736863|     0|
|loA7jZOEIdPQuovau...|[0.06168342750406...| 0.41743863|     1|
+--------------------+--------------------+-----------+------+
only showing top 5 rows



In [28]:
from pyspark.ml.linalg import Vectors

def vectorAggregator(a,b):
    output = []
    counter = a[0] + b[0]
    for i in range(len(a[1])):
        output.append(b[1][i] + a[1][i])
    return (counter, Vectors.dense(output))

In [35]:
df_agg = df_decile.rdd.map(lambda x: (x.decile, (1.0, x.features))).reduceByKey(lambda a,b: vectorAggregator(a,b)).\
map(lambda x: (x[0],[float(i/x[1][0]) for i in x[1][1]])).toDF(['Decile', "Feature_Avg"])

df_agg.orderBy("Decile").show(truncate=True)

+------+--------------------+
|Decile|         Feature_Avg|
+------+--------------------+
|     0|[0.01342153628039...|
|     1|[-0.0138670235491...|
|     2|[-0.0478363636400...|
|     3|[-0.0560319640370...|
|     4|[-0.0264511323575...|
|     5|[-0.0198613864002...|
|     6|[-0.0425838609558...|
|     7|[-0.0174017489590...|
|     8|[0.04563753454528...|
|     9|[0.10755071676683...|
+------+--------------------+



In [36]:
df_agg.write.parquet("/mnt/rentrak_decile_avg.parquet")

In [37]:
from pyspark_dfreport.builder import DataFrameReport

report = DataFrameReport("/mnt/rentrak_decile_avg.xlsx")

In [38]:
report.save_df(df_agg, name="Rentrak Decile Average")

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors

def average_by_ntile(dataframe, ntile_col, ntile=10.0):

    ntile_col_index = dataframe.columns.index(ntile_col)
    ntile_col_type = dataframe.schema[ntile_col_index].dataType

    if ntile_col_type != FloatType():
        dataframe = dataframe.withColumn(ntile_col, dataframe[ntile_col].cast(FloatType()))

    ntiles = dataframe.approxQuantile(ntile_col, [n / 10.0 for n in range(11)], 0)

    ntile_buckets = [(ntiles[i],ntiles[i+1]) for i in range(len(ntiles)-1)]

    def bucket_probability(n):
        for bucket_ in ntile_buckets:
            if n >= bucket_[0] and n <= bucket_[1]:
                return ntile_buckets.index(bucket_)

    bucket_probability = udf(bucket_probability)

    dataframe_w_decile = dataframe.withColumn("decile", bucket_probability(dataframe[ntile_col]))

    def vectorAggregator(a, b):
        output = []
        counter = a[0] + b[0]
        for i in range(len(a[1])):
            output.append(b[1][i] + a[1][i])
        return (counter, Vectors.dense(output))

    df_agg = dataframe_w_decile.rdd.map(lambda x: (x.decile, (1.0, x.features))).reduceByKey(
        lambda a, b: vectorAggregator(a, b)). \
        map(lambda x: (x[0], [float(i / x[1][0]) for i in x[1][1]])).toDF(['Decile', "Feature_Avg"])

    return df_agg.orderBy("Decile")



In [19]:
__author__ = 'andwest'

import numpy as np


def vector_std(dense_vectors):
    """

    :param dense_vectors:
    :return:
    """

    n_columns = len(dense_vectors[0])

    c_stds = []
    c_avgs = []

    for i in xrange(int(n_columns)):

        c_vec = [v[i] for v in dense_vectors]
        c_std = np.var(c_vec)
        c_avg = np.mean(c_vec)
        c_stds.append(c_std)
        c_avgs.append(c_avg)

    return [Vectors.dense(c_avgs), Vectors.dense(c_stds)]


df_agg = df_decile.rdd.map(lambda x: (x.decile, [x.features]))\
    .reduceByKey(lambda a, b: a+b) \
    .map(lambda x: tuple([x[0]] + vector_std(x[1])))\
    .toDF(['Decile', "Feature_Avg", "Feature_Var"])
    
df_agg.orderBy("Decile").show()

+------+--------------------+--------------------+
|Decile|         Feature_Avg|         Feature_Var|
+------+--------------------+--------------------+
|     0|[0.01342153628039...|[0.55367137940323...|
|     1|[-0.0138670235491...|[0.48581125350170...|
|     2|[-0.0478363636400...|[0.37637168366152...|
|     3|[-0.0560319640370...|[0.41283488548111...|
|     4|[-0.0264511323575...|[0.59463570423574...|
|     5|[-0.0198613864002...|[0.46091158972398...|
|     6|[-0.0425838609558...|[0.37158973389797...|
|     7|[-0.0174017489590...|[0.52197145310078...|
|     8|[0.04563753454528...|[0.73712164018514...|
|     9|[0.10755071676683...|[0.83036558014080...|
+------+--------------------+--------------------+



In [20]:
from pyspark_dfreport.builder import DataFrameReport

report_w_var = DataFrameReport("/home/ubuntu/rentrak_dec_avg_var.xlsx")

report_w_var.save_df(df_agg, name="rentrak_dec_avg_var")

In [22]:
census = spark.read.parquet("/mnt/census/decile_df.parquet")

census.show()

AnalysisException: u'Path does not exist: file:/mnt/census/decile_df.parquet;'