In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
import pyspark.sql.functions as F

In [4]:
from pyspark.sql.types import *

In [5]:
import pandas as pd

In [16]:
sc = pyspark.SparkContext(master="spark://192.168.1.2:7077", appName="Pi")

In [17]:
sc

In [7]:
sqlc = pyspark.sql.SQLContext(sc)

In [71]:
songs = pd.read_csv('../songdata.csv', usecols=['text'], nrows=3)

In [72]:
songs = sqlc.createDataFrame(songs)

In [73]:
songs.show(5)

+--------------------+
|                text|
+--------------------+
|Look at her face,...|
|Take it easy with...|
|I'll never know w...|
+--------------------+



In [74]:
def scanning(text, length):
    encoder_input = []
    decoder_input = []
    text_length = len(text)
    for i in range(0, text_length-2*length+1):
        encoder_input.append(list(text[i:i+length]))
        decoder_input.append(list(text[i+length:i+2*length]))
    return encoder_input, decoder_input

In [75]:
schema = StructType([
    StructField('for_encoder', ArrayType(ArrayType(StringType(), False), False), False),
    StructField('for_decoder', ArrayType(ArrayType(StringType(), False), False), False)
])

In [76]:
scanning_udf = F.udf(lambda text: scanning(text, 10), schema)

In [77]:
song_lists = songs.select(scanning_udf(songs.text).alias('lists'))

In [78]:
song_lists.printSchema()

root
 |-- lists: struct (nullable = true)
 |    |-- for_encoder: array (nullable = false)
 |    |    |-- element: array (containsNull = false)
 |    |    |    |-- element: string (containsNull = false)
 |    |-- for_decoder: array (nullable = false)
 |    |    |-- element: array (containsNull = false)
 |    |    |    |-- element: string (containsNull = false)



In [79]:
en_de = song_lists.select('lists.for_encoder', 'lists.for_decoder')

In [80]:
en_de.dtypes

[('for_encoder', 'array<array<string>>'),
 ('for_decoder', 'array<array<string>>')]

In [81]:
en_rdd = en_de.select('for_encoder').rdd

In [82]:
rdd_reduce = en_rdd.flatMap(lambda x: x).flatMap(lambda x: x).collect()

In [83]:
type(rdd_reduce)

list

In [84]:
len(rdd_reduce)

3615

In [85]:
rdd_reduce[0]

['L', 'o', 'o', 'k', ' ', 'a', 't', ' ', 'h', 'e']

In [86]:
rdd_reduce[1]

['o', 'o', 'k', ' ', 'a', 't', ' ', 'h', 'e', 'r']

In [87]:
rdd_reduce[2]

['o', 'k', ' ', 'a', 't', ' ', 'h', 'e', 'r', ' ']

In [15]:
sc.stop()