In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.appName('StepikProjectSession').getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [6]:
df = spark.read.parquet('./clickstream.parquet')

In [7]:
df.show(3)

+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|      date|               time|event|platform| ad_id|client_union_id|compaign_union_id|ad_cost_type|ad_cost|has_video|target_audience_count|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|2019-04-01|2019-04-01 00:00:48| view| android| 45061|          34734|            45061|         CPM|  200.6|        0|              1955269|
|2019-04-01|2019-04-01 00:00:48| view|     web|121288|         121288|           121288|         CPM|  187.4|        0|               232011|
|2019-04-01|2019-04-01 00:01:03| view| android|102737|         102535|           102564|         CPC|   60.7|        0|                 4410|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
only s

In [8]:
df[['ad_cost_type']].distinct().show()

+------------+
|ad_cost_type|
+------------+
|         CPC|
|         CPM|
+------------+



In [9]:
df.selectExpr('count(*)').show()

+--------+
|count(1)|
+--------+
| 1000000|
+--------+



In [10]:
base_columns = ['ad_id', 'target_audience_count', 'has_video', 'ad_cost', 'ad_cost_type']
base_frame = df[base_columns].drop_duplicates()

In [11]:
base_frame = base_frame.selectExpr('ad_id', 'target_audience_count', 'has_video', 'ad_cost', "cast(ad_cost_type = 'CPM' as int) as is_cpm", "cast(ad_cost_type = 'CPC' as int) as is_cpc")

In [12]:
base_frame.show(3)

+------+---------------------+---------+-------+------+------+
| ad_id|target_audience_count|has_video|ad_cost|is_cpm|is_cpc|
+------+---------------------+---------+-------+------+------+
|120378|                39423|        0|  182.5|     1|     0|
| 31373|              2209090|        0|  205.9|     1|     0|
|110531|              6798067|        0|  200.5|     1|     0|
+------+---------------------+---------+-------+------+------+
only showing top 3 rows



In [13]:
base_frame.selectExpr('count(*)').show()

+--------+
|count(1)|
+--------+
|     965|
+--------+



In [14]:
additional_info = df.groupby('ad_id').agg(F.countDistinct('date').alias('day_count'), F.round((F.sum((df.event == 'click').astype('int'))/F.sum((df.event == 'view').astype('int'))), 6).alias('CTR'))
#"sum(cast(event = 'click' as int))/sum(cast(event = 'view' as int)) as CTR"

In [15]:
adv_frame = base_frame.join(additional_info, on='ad_id', how='leftouter')

In [16]:
adv_frame.selectExpr('count(*)').show()

+--------+
|count(1)|
+--------+
|     965|
+--------+



In [17]:
adv_frame.show(10)

+------+---------------------+---------+-------+------+------+---------+-------+
| ad_id|target_audience_count|has_video|ad_cost|is_cpm|is_cpc|day_count|    CTR|
+------+---------------------+---------+-------+------+------+---------+-------+
|120378|                39423|        0|  182.5|     1|     0|        2|0.00691|
| 31373|              2209090|        0|  205.9|     1|     0|        1|    0.0|
|110531|              6798067|        0|  200.5|     1|     0|        2| 9.8E-4|
| 41791|                 2655|        0|  201.8|     1|     0|        2|0.00794|
|116295|                54467|        0|  207.6|     1|     0|        2|    0.0|
| 38651|                92362|        0|  190.0|     1|     0|        2|    0.0|
| 18277|                55801|        0|  185.1|     1|     0|        2|    0.0|
| 41279|                10440|        0|  186.3|     1|     0|        2|0.04762|
| 20696|                 6638|        0|  205.5|     1|     0|        2|    0.0|
|111127|                9114

In [18]:
adv_frame.randomSplit([.75, .25])

[DataFrame[ad_id: int, target_audience_count: decimal(10,0), has_video: int, ad_cost: double, is_cpm: int, is_cpc: int, day_count: bigint, CTR: double],
 DataFrame[ad_id: int, target_audience_count: decimal(10,0), has_video: int, ad_cost: double, is_cpm: int, is_cpc: int, day_count: bigint, CTR: double]]

In [None]:
adv_frame.write.parquet

In [20]:
import os

In [22]:
target_path = 'pathname'

In [23]:
proportions = [.75, .25]
dirnames = ['train', 'test']
results = adv_frame.randomSplit(proportions)
for dirname, result in zip(dirnames, results):
    target_dir = os.path.join(target_path, dirname)
    file_name = os.path.join(target_dir, f'{dirname}_frame.parquet')
    print(target_dir)
    print(file_name)

pathname\train
pathname\train\train_frame.parquet
pathname\test
pathname\test\test_frame.parquet


In [None]:
file_name = os.path.join(target_dir, f'{dirname}_frame.parquet')

In [32]:
input_path, target_path = 'clickstream.parquet', 'result'
df = spark.read.parquet(input_path)
base_columns = ['ad_id', 'target_audience_count',
                'has_video', 'ad_cost', 'ad_cost_type']
base_frame = df[base_columns].drop_duplicates()
base_frame = base_frame.selectExpr(
    'ad_id',
    'target_audience_count',
    'has_video',
    "cast(ad_cost_type = 'CPM' as int) as is_cpm",
    "cast(ad_cost_type = 'CPC' as int) as is_cpc",
    'ad_cost'
)
additional_info = df.groupby('ad_id').agg(
    F.countDistinct('date').alias('day_count'),
    F.round((F.sum((df.event == 'click').astype('int')) /
             F.sum((df.event == 'view').astype('int'))), 6).alias('CTR')
)
adv_frame = base_frame.join(additional_info, on='ad_id', how='leftouter')
proportions = [.75, .25]
dirnames = ['train', 'test']
results = adv_frame.randomSplit(proportions)
os.makedirs(target_path, exist_ok=True)
for dirname, result in zip(dirnames, results):
    target_dir = os.path.join(target_path, dirname)
    result.write.parquet(target_dir)

Py4JJavaError: An error occurred while calling o420.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:865)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:547)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:587)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:705)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1814)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1791)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:302)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:326)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:343)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)
	... 21 more


In [26]:
!python PySparkJob.py clickstream.parquet result

21/03/05 23:55:11 WARN Shell: Did not find winutils.exe: {}
java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1814)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1791)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:302)
	

Input path to file: clickstream.parquet
Target path: result
“бЇҐи­®: Џа®жҐбб, б Ё¤Ґ­вЁдЁЄ в®а®¬ 11292, ¤®зҐа­Ё© Їа®жҐбб  2600, Ўл« § ўҐаиҐ­.
“бЇҐи­®: Џа®жҐбб, б Ё¤Ґ­вЁдЁЄ в®а®¬ 2600, ¤®зҐа­Ё© Їа®жҐбб  11716, Ўл« § ўҐаиҐ­.
“бЇҐи­®: Џа®жҐбб, б Ё¤Ґ­вЁдЁЄ в®а®¬ 11716, ¤®зҐа­Ё© Їа®жҐбб  17244, Ўл« § ўҐаиҐ­.
