In [82]:
from pyspark.sql import SparkSession, Row, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.streaming import StreamingContext

from pyspark import SparkContext, SparkConf

In [83]:
sc = SparkContext.getOrCreate()

### SQLContext
SQLContext - это точка входа в Spark SQL, который представляет собой модуль для обработки структурированных данных. 
После инициализации SQLContext можно использовать его для выполнения различных “sql-подобных” операций 

In [84]:
from pyspark.sql import SQLContext

sql_context = SQLContext(sc)

### SparkSession
Spark 2.0 представила новую точку входа под названием Spark Session, которая по существу заменила как SQLContext, так и HiveContext.

In [85]:
from pyspark.sql import SparkSession
spark_session = SparkSession \
    .builder \
    .enableHiveSupport() \
    .getOrCreate()
# Two ways you can access spark context from spark session
spark_context = spark_session._sc
spark_context = spark_session.sparkContext

In [86]:
def getDStream(spark,batch_interval=5):
    sc=spark.sparkContext
    sc.setLogLevel("ERROR")

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    # Create a DStream that represents streaming data from a network socket
    # See https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
    dstream = ssc.socketTextStream("localhost", 9999)
    
    return [sc,ssc,dstream]

In [87]:
[sc,ssc,dstream]=getDStream(spark=spark_session,batch_interval=10)

dstream.pprint()

In [88]:
import os
from os.path import isfile, join 

loc = os.path.abspath("")
data_loc = f"{loc}/data/"

In [89]:
emp = [(1, "AAA", "dept1", 1000, "2022-02-01 15:12:13"),
    (2, "BBB", "dept1", 1100, "2022-04-01 5:12:3"),
    (3, "CCC", "dept1", 3000, "2022-06-05 1:2:13"),
    (4, "DDD", "dept1", 1500, "2022-08-10 10:52:53"),
    (5, "EEE", "dept2", 8000, "2022-01-11 5:52:43"),
    (6, "FFF", "dept2", 7200, "2022-04-14 19:32:33"),
    (7, "GGG", "dept3", 7100, "2022-02-21 15:42:43"),
    (8, "HHH", "dept3", 3700, "2022-09-25 15:32:33"),
    (9, "III", "dept3", 4500, "2022-10-15 15:22:23"),
    (10, "JJJ", "dept5", 3400, "2022-12-17 15:14:17")]
empdf = spark_session.createDataFrame(emp, ["id", "name", "dept", "salary", "date"])

In [90]:
df = (empdf
    .select("date")
    .withColumn("next_month", F.add_months("date", 1)))
df.show(5)

+-------------------+----------+
|               date|next_month|
+-------------------+----------+
|2022-02-01 15:12:13|2022-03-01|
|  2022-04-01 5:12:3|2022-05-01|
|  2022-06-05 1:2:13|2022-07-05|
|2022-08-10 10:52:53|2022-09-10|
| 2022-01-11 5:52:43|2022-02-11|
+-------------------+----------+
only showing top 5 rows



### Работа с заготовленным датасетом

In [91]:
df = spark_session.read.csv("/content/dataset.csv", inferSchema=True, header=True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- growth: string (nullable = true)
 |-- title: string (nullable = true)
 |-- flair: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- upvote_ratio: double (nullable = true)
 |-- author: string (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- url: string (nullable = true)
 |-- timestamp: integer (nullable = true)



In [92]:
df.show(2)

+------+------+------+--------------------+----------+-----+------------+--------------------+------------+--------------------+-------------------+--------------------+----------+
|    id|ticker|growth|               title|     flair|score|upvote_ratio|              author|num_comments|                text|            created|                 url| timestamp|
+------+------+------+--------------------+----------+-----+------------+--------------------+------------+--------------------+-------------------+--------------------+----------+
|krsa5c|  None|   N/A|       update $ yolo|      YOLO|   15|         1.0|            dhsmatt2|         290|all i am still ho...|2021-01-06 00:00:00|https://www.reddi...|1609909200|
|krxpdq|  None|   N/A|upcoming nio day ...|Discussion|    9|         1.0|Fancy-Blackberry-332|          60|nio day is coming...|2021-01-06 00:00:00|https://www.reddi...|1609909200|
+------+------+------+--------------------+----------+-----+------------+--------------------+-

In [93]:
df_crop = df.select("author", "flair", "score", "upvote_ratio")
df_crop.show(5)

+--------------------+----------+-----+------------+
|              author|     flair|score|upvote_ratio|
+--------------------+----------+-----+------------+
|            dhsmatt2|      YOLO|   15|         1.0|
|Fancy-Blackberry-332|Discussion|    9|         1.0|
|           JKK201519|      YOLO|    5|        0.86|
|             drtywlf|Discussion|    8|         1.0|
|             MC_lgnd|Discussion|    6|        0.88|
+--------------------+----------+-----+------------+
only showing top 5 rows



### Train/test split

In [94]:
train, test = df_crop.randomSplit([0.7, 0.3], seed=7)

In [95]:
print(f"Train set length: {train.count()} timestamps")
print(f"Test set length: {test.count()} timestamps")

Train set length: 536 timestamps
Test set length: 252 timestamps


In [96]:
train.show(2)

+------+----------+-----+------------+
|author|     flair|score|upvote_ratio|
+------+----------+-----+------------+
|#NAME?|Discussion|   16|         1.0|
|#NAME?|      News|   15|        0.76|
+------+----------+-----+------------+
only showing top 2 rows



### Dtypes
Нам нужно определить, какие столбцы являются числовыми, а какие категориальными.

In [97]:
train.dtypes

[('author', 'string'),
 ('flair', 'string'),
 ('score', 'int'),
 ('upvote_ratio', 'double')]

In [98]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double" | dataType == "int") & (x != "isFraud"))
]

In [99]:
print(numCols)
print(catCols)

['upvote_ratio']
['author', 'flair']


### One hot encoding

In [100]:
train.agg(F.countDistinct("author")).show()

+-------------+
|count(author)|
+-------------+
|          521|
+-------------+



In [101]:
df.agg(F.countDistinct("id")).show()

+---------+
|count(id)|
+---------+
|      786|
+---------+



In [102]:
train.groupBy("flair").count().show()

+------------------+-----+
|             flair|count|
+------------------+-----+
|        Discussion|  291|
|              YOLO|   76|
|              Mods|    1|
|  Daily Discussion|    3|
|              Meme|    3|
|                DD|   67|
|              Gain|   28|
|Weekend Discussion|    1|
|             Chart|    5|
|              Loss|    5|
|              News|   56|
+------------------+-----+



In [103]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [104]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [105]:
string_indexer

[StringIndexer_e975c584bcdc, StringIndexer_c1b78d5a254f]

In [106]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [107]:
one_hot_encoder

[OneHotEncoder_4ee91cac41f3]

### Vector assembling
Объединяет значения входных столбцов в единый вектор


In [108]:
from pyspark.ml.feature import VectorAssembler

In [109]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [110]:
assemblerInput

['upvote_ratio', 'author_OneHotEncoder', 'flair_OneHotEncoder']

In [111]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [112]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]


In [113]:
stages

[StringIndexer_e975c584bcdc,
 StringIndexer_c1b78d5a254f,
 OneHotEncoder_4ee91cac41f3,
 VectorAssembler_f4c7b01ba3a4]

In [114]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: user 41.4 ms, sys: 12 ms, total: 53.4 ms
Wall time: 1.06 s


In [115]:
pp_df.select(
    "author_OneHotEncoder", "author", "author_StringIndexer", "flair_OneHotEncoder", "flair_StringIndexer",
).show(truncate=False)

+--------------------+--------------------+--------------------+-------------------+-------------------+
|author_OneHotEncoder|author              |author_StringIndexer|flair_OneHotEncoder|flair_StringIndexer|
+--------------------+--------------------+--------------------+-------------------+-------------------+
|(520,[0],[1.0])     |AutoModerator       |0.0                 |(10,[7],[1.0])     |7.0                |
|(520,[60],[1.0])    |CompetitionForward67|60.0                |(10,[0],[1.0])     |0.0                |
|(520,[64],[1.0])    |CovidLiveFL         |64.0                |(10,[1],[1.0])     |1.0                |
|(520,[137],[1.0])   |Lucky-Designer3469  |137.0               |(10,[0],[1.0])     |0.0                |
|(520,[214],[1.0])   |Simplifyingication  |214.0               |(10,[4],[1.0])     |4.0                |
|(520,[226],[1.0])   |Swiss-Rock          |226.0               |(10,[0],[1.0])     |0.0                |
|(520,[233],[1.0])   |Tereek7504          |233.0       

In [116]:
pp_df.select("VectorAssembler_features").show(5)

+------------------------+
|VectorAssembler_features|
+------------------------+
|    (531,[0,1,528],[1...|
|    (531,[0,61,521],[...|
|    (531,[0,65,522],[...|
|    (531,[0,138,521],...|
|    (531,[0,215,525],...|
+------------------------+
only showing top 5 rows

