In [8]:
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import (
    array,
    col,
    concat,
    explode,
    lit,
    struct,
    to_date,
    to_utc_timestamp,
    udf,
    when,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
reader = spark.read
path = "/data/tracking/streaming/AdTrainingDataEvent/daily/2021/10/18/compaction_1/*"
data = reader.format("orc").load(path)

In [85]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))
print(df.columns)
#df.select("row_num").show()

In [86]:
def get_sized_df(df):
    total = df.count()
    step_amount = 100000
    for x in range(0, total, step_amount):
        partial = df.filter((df.row_num >= x) & (df.row_num < x + step_amount))
        yield partial

rc = df.select("requestContext", "row_num")
generator = get_sized_df(rc)

#print(next(generator).count())

In [None]:
stored_features = set()

def get_feature_set(df):
    rows = rc.collect()
    s = set()
    for row in rows:
        variables = row["requestContext"].keys()
        for variable in variables:
            s.add(variable)
    return s


for df in generator:
    feature_set = get_feature_set(df)
    stored_features.update(feature_set)

for x in sorted(stored_features):
    print(x)

# Request Context Features
- ENGAGEMENT
- LEGACY
- WEBSITE_VISIT
- activity_id
- adFormat
- advertiser_id
- auction_position
- campaign_chargeability
- campaign_cost_type
- campaign_id
- campaign_objective_type
- campaign_type
- channel_id
- channel_position
- clicked
- cost
- creative_id
- device
- feed_tracking_id
- format
- impressionDiscountFactor
- impression_duration
- impression_id
- member_id
- modelType
- modelVersion
- originalPctr
- parameters
- pctr
- position
- positionCorrectionFactor
- position_layout
- position_page
- request_id
- request_impression_delay
- scoring_tracking_time
- swap_context
- tracking_time
- tscpHitAttribute
- viewed

In [30]:
data.createOrReplaceTempView('data')

In [103]:
%%spark -c sql -o local_pandas_df -n 16808591
select * from data

In [9]:
%%local
#for row in local_pandas_df.itertuples():
#    print(row)

In [96]:
%%local
import numpy as np
import math


Xs = []
ys = []

features = []
Xs = []
ys = []
for row in local_pandas_df.itertuples():
    X = np.array([row.awareness_0_1, row.awareness_2_3, row.awareness_4_7, row.awareness_8_30, row.awareness_31_60, row.awareness_61_90, row.awareness_91_120, row.awareness_121_150, row.awareness_151_180, row.awareness_181_210, row.awareness_211_240, row.awareness_241_270, row.awareness_271_300, row.awareness_301_330, row.awareness_331_360, row.visit_0_1, row.visit_2_3, row.visit_4_7, row.visit_8_30, row.visit_31_60, row.visit_61_90, row.visit_91_120, row.visit_121_150, row.visit_151_180, row.visit_181_210, row.visit_211_240, row.visit_241_270, row.visit_271_300, row.visit_301_330, row.visit_331_360])
    y = np.array([np.maximum(row.awareness_summation, row.visit_summation)])
    Xs.append(X)
    ys.append(y)
    

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
from sklearn import datasets, linear_model
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
from scipy import stats

Xs = np.array(Xs)
ys = np.array(ys)#.reshape(-1,1)
#reg = LinearRegression().fit(Xs, ys)
# model = DecisionTreeRegressor()
model = RandomForestClassifier(random_state=0)
r = model.fit(Xs,ys.ravel())

#for x in sorted(zip(r.feature_importances_, metrics), reverse = True):
#    print(x)

print()
print()

l = []
model = LogisticRegression(max_iter=1000000)
r = model.fit(Xs,ys.ravel())
from collections import defaultdict
d = defaultdict(lambda: [])

for array in r.coef_:
    for coef_index in range(len(array)):
        coef = array[coef_index]
        name = metrics[coef_index]
        d[name].append(coef)

import numpy as np        
size = len(r.coef_[0])
for coef_index in range(len(r.coef_[0])):
    name = metrics[coef_index]
    values = (np.median(d[name]), np.mean(d[name]), np.var(d[name]))
    print("{0}: {1};".format(name, values))
#for coef, name in sorted(zip(model_array, metrics), reverse = True):
#    print("{0}: {1}".format(name, coef))

In [86]:
%local
r.coef_[0]

In [42]:
%%local
len(ys)
len(local_pandas_df)

In [54]:
%%local
from sklearn.linear_model import LogisticRegression
#from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_validate
metrics = ['awareness_0_1', 'awareness_2_3', 'awareness_4_7', 'awareness_8_30', 'awareness_31_60', 'awareness_61_90', 'awareness_91_120', 'awareness_121_150', 'awareness_151_180', 'awareness_181_210', 'awareness_211_240', 'awareness_241_270', 'awareness_271_300', 'awareness_301_330', 'awareness_331_360', 'visit_0_1', 'visit_2_3', 'visit_4_7', 'visit_8_30', 'visit_31_60', 'visit_61_90', 'visit_91_120', 'visit_121_150', 'visit_151_180', 'visit_181_210', 'visit_211_240', 'visit_241_270', 'visit_271_300', 'visit_301_330', 'visit_331_360']

#X_train, X_test, y_train, y_test = train_test_split(Xs, ys, test_size=0.2, random_state=0)

model = LogisticRegression(max_iter=100000)
cv = cross_validate(model, Xs, ys.ravel(), cv=10, return_estimator=True)
print(cv['test_score'])
print(cv['test_score'].mean())
print(cv['estimator'][0].coef_[0])

for model in cv['estimator']:
    for model_array in model.coef_:
        for coef, name in sorted(zip(model_array, metrics), reverse = True):
            print("{0}: {1}".format(name, coef))