# Clean data

In [8]:
!pip install nltk==3.6.5
!pip install sklearn==0.0



In [9]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7faf243976d0>


In [10]:
crypto_df = spark.read.format("mongo").option("uri","mongodb://mongo/seng550.crypto").load()
crypto_df.show()
headers_df = spark.read.format("mongo").option("uri","mongodb://mongo/seng550.headers").load()
headers_df.show()

+--------------------+------------------+----------+------------------+
|                 _id|             close|      date|              open|
+--------------------+------------------+----------+------------------+
|{619f20a05fec62ac...|        61238.6222|2021-10-23| 60690.28731420007|
|{619f20a15fec62ac...| 60694.00000650834|2021-10-22| 62196.96438572152|
|{619f20a15fec62ac...|62298.093007718286|2021-10-21| 65961.04976056097|
|{619f20a15fec62ac...| 65986.30293718613|2021-10-20|        64252.7263|
|{619f20a15fec62ac...|        64217.0839|2021-10-19|        62020.0126|
|{619f20a15fec62ac...|        61953.2331|2021-10-18| 61543.26469764322|
|{619f20a15fec62ac...| 61582.25155249131|2021-10-17| 60867.89483798279|
|{619f20a15fec62ac...| 60860.89183706596|2021-10-16| 61698.03021926293|
|{619f20a15fec62ac...| 61526.33413298988|2021-10-15|57312.383791730215|
|{619f20a15fec62ac...|  57351.7384160736|2021-10-14|        57385.8884|
|{619f20a15fec62ac...|        57357.6067|2021-10-13| 56030.57101

### Clean the crypto dataframe - JD

In [11]:
# https://stackoverflow.com/questions/58823628/call-a-function-for-each-row-of-a-dataframe-in-pysparknon-pandas
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
def get_change(close, open_):
    return  close - open_
spark.udf.register("change_udf", get_change)
# crypto_df.show()
crypto_df = crypto_df.withColumn('change', get_change(col('close'), col('open'))).drop("close","open","_id")
crypto_df.show()

+----------+-------------------+
|      date|             change|
+----------+-------------------+
|2021-10-23|  548.3348857999299|
|2021-10-22|-1502.9643792131828|
|2021-10-21| -3662.956752842685|
|2021-10-20| 1733.5766371861318|
|2021-10-19|  2197.071299999996|
|2021-10-18|  409.9684023567752|
|2021-10-17|   714.356714508518|
|2021-10-16| -837.1383821969721|
|2021-10-15|  4213.950341259668|
|2021-10-14|   -34.149983926407|
|2021-10-13| 1327.0356888160168|
|2021-10-12|-1363.3682153036207|
|2021-10-11|   2703.30516992879|
|2021-10-10|-236.60011243464396|
|2021-10-09|  1139.973213752317|
|2021-10-08| 111.23452299229393|
|2021-10-07|-1506.7725035558396|
|2021-10-06|  3885.388159487484|
|2021-10-05|  2271.338468076465|
|2021-10-04|   990.114499999996|
+----------+-------------------+
only showing top 20 rows



21/12/06 21:12:57 WARN SimpleFunctionRegistry: The function change_udf replaced a previously registered function.


### Clean the headers dataframe - JD

In [12]:

import string
from nltk.stem.porter import PorterStemmer
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

def clean_header(header):
    porter = PorterStemmer()
    # headers are small, so local process is ok
    header = header.strip()
    if not header:
        return ''
    header = header.translate(str.maketrans('', '', string.punctuation))
    return ' '.join(porter.stem(word) for word in header.split() if word not in ENGLISH_STOP_WORDS)
udf_clean_header = udf(lambda s: clean_header(s), StringType())
headers_df = headers_df.withColumn('clean_headline', udf_clean_header(col('headline'))).drop("headline","_id")
headers_df.show()

+----------+--------------------+
|      date|      clean_headline|
+----------+--------------------+
|2021-08-01|famili speak raci...|
|2021-08-01|china invas taiwa...|
|2021-08-01|us digit payment ...|
|2021-08-01|‘allow malaysian ...|
|2021-08-01|australia afterpa...|
|2021-08-01|american blast au...|
|2021-08-01|turkey fire ‘like...|
|2021-08-01|horoscop today mo...|
|2021-08-01|horoscop today as...|
|2021-08-01|horoscop today as...|
|2021-08-02|govt decid import...|
|2021-08-02|market live tuesd...|
|2021-08-02|afghan peac effor...|
|2021-08-02|minor girl’ rape ...|
|2021-08-02|john clees ask pl...|
|2021-08-02|afghan fight rage...|
|2021-08-02|afghan presid ash...|
|2021-08-02|insid actor areeb...|
|2021-08-02|jack dorsey outli...|
|2021-08-02|age empir iv late...|
+----------+--------------------+
only showing top 20 rows



### Join - Owen

In [13]:
df = crypto_df.join(headers_df, on='date')
df.show()

[Stage 55:>                                                         (0 + 1) / 1]

+----------+-------------------+--------------------+
|      date|             change|      clean_headline|
+----------+-------------------+--------------------+
|2021-04-06|-1002.7633574389984|asian share eas t...|
|2021-04-06|-1002.7633574389984|patient wale uk r...|
|2021-04-06|-1002.7633574389984|dubai model dodg ...|
|2021-04-06|-1002.7633574389984|two deshaun watso...|
|2021-04-06|-1002.7633574389984|nasa say photo ra...|
|2021-04-06|-1002.7633574389984|arkansa governor ...|
|2021-04-06|-1002.7633574389984|derek chauvin tri...|
|2021-04-06|-1002.7633574389984|australia accus e...|
|2021-04-06|-1002.7633574389984|first covid vacci...|
|2021-04-06|-1002.7633574389984|california pacif ...|
|2021-08-30| -1742.739300000001|australian urg sw...|
|2021-08-30| -1742.739300000001|hurrican veteran ...|
|2021-08-30| -1742.739300000001|california nation...|
|2021-08-30| -1742.739300000001|govt doesn’t acce...|
|2021-08-30| -1742.739300000001|whi serena willia...|
|2021-08-30| -1742.739300000

                                                                                

### Split data - JD

In [14]:
# method 1 https://stackoverflow.com/a/51773836, can only be done on 1 partition
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

# assign ranks
# TODO better method to avoid this problem - if we do random splits it might be better? 
df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("date")))
# split
X_train = df.where("rank <= .8").drop("rank","change")
X_train.show()
X_test = df.where("rank > .8").drop("rank","change")
X_test.show()
y_train = df.where("rank <= .8").drop("rank","clean_headline")
y_train.show()
y_test = df.where("rank > .8").drop("rank","clean_headline")
y_test.show()

# method 2 https://stackoverflow.com/a/53193549, needs the number of dates fed in to split
# from pyspark.ml.feature import StringIndexer

# stringIndexer = StringIndexer(inputCol="date", outputCol="index")
# model = stringIndexer.fit(headers_df)
# headers_df = model.transform(headers_df).withColumn("index", col("index").cast("int"))

# headers_df.show()

21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 2

+----------+--------------------+
|      date|      clean_headline|
+----------+--------------------+
|2021-03-01|black amazon exec...|
|2021-03-01|jeff bezo versu m...|
|2021-03-01|michael gudinski ...|
|2021-03-01|new revolutionari...|
|2021-03-01|ladi gaga dog wal...|
|2021-03-01|studi assess caus...|
|2021-03-01|seahawk qb russel...|
|2021-03-01|call perman reope...|
|2021-03-01|an insan blurri t...|
|2021-03-01|one hin leong fou...|
|2021-03-02|white hous pull n...|
|2021-03-02|is pfizer covid v...|
|2021-03-02|hilaria alec bald...|
|2021-03-02|gov abbott execut...|
|2021-03-02|risk breast cance...|
|2021-03-02|texa governor lif...|
|2021-03-02|covid19 vaccin al...|
|2021-03-02|report suggest ro...|
|2021-03-02|who is meghan mar...|
|2021-03-02|rio tinto chairma...|
+----------+--------------------+
only showing top 20 rows



21/12/06 21:12:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+--------------------+
|      date|      clean_headline|
+----------+--------------------+
|2021-09-07|britney spears’ d...|
|2021-09-07|leylah fernandez ...|
|2021-09-07|taliban air scatt...|
|2021-09-07|texa abort ‘whist...|
|2021-09-07|new sharktooth di...|
|2021-09-07|covid19 huge vacc...|
|2021-09-07|samsung galaxi no...|
|2021-09-07|in flood manvil n...|
|2021-09-07|morn brief top st...|
|2021-09-07|britney spear fat...|
|2021-09-08|how support a fri...|
|2021-09-08|tropic storm mind...|
|2021-09-08|pediatr covid cas...|
|2021-09-08|mcconnel send lot...|
|2021-09-08|taliban cabinet i...|
|2021-09-08|karnal stir mirro...|
|2021-09-08|air new zealand a...|
|2021-09-08|labor depart tout...|
|2021-09-08|covid19 breakthro...|
|2021-09-08|scott morrison se...|
+----------+--------------------+
only showing top 20 rows



21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------------------+
|      date|            change|
+----------+------------------+
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-01| 4448.866828906001|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
|2021-03-02|-1344.387735297998|
+----------+------------------+
only showing top 20 rows



21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/06 21:13:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------------------+
|      date|            change|
+----------+------------------+
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-07|        -5786.6247|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
|2021-09-08|-805.3629999999976|
+----------+------------------+
only showing top 20 rows

