In [1]:
#RIZAL CODES PYSPARK

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from time import time


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext, SQLContext
import sys, os
import shutil
import csv

from pyspark.sql.functions import *





#initialize the spark session
conf = SparkConf().setAppName('TextToTable')
#sc = SparkContext(conf=conf)
#sqlContext = SQLContext(sc)
#sc.setLogLevel('ERROR')

spark = SparkSession \
    .builder \
    .appName("Spark-getDMP")\
	.config("spark.some.config.option", "some-value")\
    .getOrCreate()

In [2]:
#================== read txt & save to csv ================


def readmytxt(filename,separator,mylist):
    
    print'\n========== ',filename,' ===================='
    myrdd = sc.textFile(filename)\
            .map(lambda l: l.split(separator))
    df = sqlContext.createDataFrame(myrdd)
    
    for i in range(len(df.schema.names)):      
        df = df.withColumnRenamed(df.schema.names[i],mylist[i])

    df.show(5)
    
    
   
    #write databricks csv 
    myfoldername = filename.split('.')[0]
    """
    if os.path.isdir(myfoldername):shutil.rmtree(myfoldername)
    df.coalesce(1)\
    .write\
    .format("com.databricks.spark.csv")\
    .option("header", "true")\
    .save(myfoldername)
    print 'databricks:write csv done!'
    
     """
    
    #write pandas csv 
    csvfilename = myfoldername +'.csv'
    df_pd = df.toPandas()
    df_pd.to_csv(csvfilename, quoting=csv.QUOTE_NONNUMERIC, index = False, encoding = 'utf-8')
    print csvfilename,': WRITE PANDAS TO CSV DONE!'
    
    
    
    return df





    
    
mv_title_meta = readmytxt("movie_titles_metadata.txt",
                 " +++$+++ ",
                 ['movieID','movieTitle','movieYear','IMDB_Rating','IMDB_n_votes','movie_genre'])



mv_char_meta = readmytxt("movie_characters_metadata.txt",
                 " +++$+++ ",
                 ['characterID','characterName','movieID','movieTitle','gender','pos_credit'])





mv_line = readmytxt("movie_lines.txt",
                 " +++$+++ ",
                 ['lineID','characterID','movieID','characterName','utter'])






mv_conv = readmytxt("movie_conversations.txt",
                 " +++$+++ ",
                 ['characterID_1','characterID_2','movieID','utter_list'])





raw_scr_url = readmytxt("raw_script_urls.txt",
                 " +++$+++ ",
                 ['movieID','movieTitle','url'])






+-------+--------------------+---------+-----------+------------+--------------------+
|movieID|          movieTitle|movieYear|IMDB_Rating|IMDB_n_votes|         movie_genre|
+-------+--------------------+---------+-----------+------------+--------------------+
|     m0|10 things i hate ...|     1999|       6.90|       62847|['comedy', 'roman...|
|     m1|1492: conquest of...|     1992|       6.20|       10421|['adventure', 'bi...|
|     m2|          15 minutes|     2001|       6.10|       25854|['action', 'crime...|
|     m3|2001: a space ody...|     1968|       8.40|      163227|['adventure', 'my...|
|     m4|             48 hrs.|     1982|       6.90|       22289|['action', 'comed...|
+-------+--------------------+---------+-----------+------------+--------------------+
only showing top 5 rows

movie_titles_metadata.csv : WRITE PANDAS TO CSV DONE!

+-----------+-------------+-------+--------------------+------+----------+
|characterID|characterName|movieID|          movieTitle|gende

In [3]:
#=================  pyspark ==================
from pyspark.sql.functions import col, lower, regexp_replace, split

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  #c = split(c, "\\s+") tokenization...
  return c

clean_text_df = mv_line.select(clean_text(col("utter")).alias("utter"))

clean_text_df.printSchema()
clean_text_df.show(10)

root
 |-- utter: string (nullable = true)

+--------------------+
|               utter|
+--------------------+
|         they do not|
|          they do to|
|           i hope so|
|            she okay|
|             lets go|
|                 wow|
|okay  youre gonna...|
|                  no|
|im kidding  you k...|
|like my fear of w...|
+--------------------+
only showing top 10 rows



In [4]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="utter", outputCol="vector")
vector_df = tokenizer.transform(clean_text_df).select("vector")

vector_df.printSchema()
vector_df.show(10)

root
 |-- vector: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|              vector|
+--------------------+
|     [they, do, not]|
|      [they, do, to]|
|       [i, hope, so]|
|         [she, okay]|
|          [lets, go]|
|               [wow]|
|[okay, , youre, g...|
|                [no]|
|[im, kidding, , y...|
|[like, my, fear, ...|
+--------------------+
only showing top 10 rows



In [5]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 


remover.setInputCol("vector")
remover.setOutputCol("vector_no_stopw")

# Transform existing dataframe with the StopWordsRemover
vector_no_stopw_df = remover.transform(vector_df).select("vector_no_stopw")

# Display
vector_no_stopw_df.printSchema()
vector_no_stopw_df.show()

root
 |-- vector_no_stopw: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|     vector_no_stopw|
+--------------------+
|                  []|
|                  []|
|              [hope]|
|              [okay]|
|          [lets, go]|
|               [wow]|
|[okay, , youre, g...|
|                  []|
|[im, kidding, , k...|
|[like, fear, wear...|
|              [real]|
|       [good, stuff]|
|[figured, youd, g...|
|[thank, god, , he...|
|[, endless, blond...|
|              [crap]|
|      [listen, crap]|
|                  []|
|[guillermo, says,...|
|   [always, selfish]|
+--------------------+
only showing top 20 rows



In [6]:
# Import stemmer library
from nltk.stem.porter import *

# Instantiate stemmer object
stemmer = PorterStemmer()

# Quick test of the stemming function
tokens = ["thanks", "its", "proverbially", "unexpected", "running"]
for t in tokens:
  print(stemmer.stem(t))

thank
it
proverbi
unexpect
run


In [7]:
# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec



# Create user defined function for stemming with return type Array<String>
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))




# Create new df with vectors containing the stemmed tokens 
vector_stemmed_df = (
    vector_no_stopw_df
        .withColumn("vector_stemmed", stemmer_udf("vector_no_stopw"))
        .select("vector_stemmed")
  )

# Rename df and column for clarity
production_df = vector_stemmed_df.select(col("vector_stemmed").alias("unigrams"))

# Display
production_df.printSchema()
production_df.show()

root
 |-- unigrams: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|            unigrams|
+--------------------+
|                  []|
|                  []|
|              [hope]|
|              [okay]|
|               [let]|
|               [wow]|
|[okay, your, gonn...|
|                  []|
|[kid, know, somet...|
|[like, fear, wear...|
|              [real]|
|       [good, stuff]|
|[figur, youd, get...|
|[thank, god, hear...|
|[endless, blond, ...|
|              [crap]|
|      [listen, crap]|
|                  []|
|[guillermo, say, ...|
|    [alway, selfish]|
+--------------------+
only showing top 20 rows



In [8]:
from pyspark.ml.feature import NGram

# Define NGram transformer
ngram = NGram(n=2, inputCol="unigrams", outputCol="bigrams")

# Create bigram_df as a transform of unigram_df using NGram tranformer
production_df = ngram.transform(production_df)

# Display
production_df.printSchema()
production_df.show()

root
 |-- unigrams: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- bigrams: array (nullable = true)
 |    |-- element: string (containsNull = false)

+--------------------+--------------------+
|            unigrams|             bigrams|
+--------------------+--------------------+
|                  []|                  []|
|                  []|                  []|
|              [hope]|                  []|
|              [okay]|                  []|
|               [let]|                  []|
|               [wow]|                  []|
|[okay, your, gonn...|[okay your, your ...|
|                  []|                  []|
|[kid, know, somet...|[kid know, know s...|
|[like, fear, wear...|[like fear, fear ...|
|              [real]|                  []|
|       [good, stuff]|        [good stuff]|
|[figur, youd, get...|[figur youd, youd...|
|[thank, god, hear...|[thank god, god h...|
|[endless, blond, ...|[endless blond, b...|
|              [crap]|      

In [9]:
from pyspark.ml.feature import NGram

# Define NGram transformer
ngram = NGram(n=3, inputCol="unigrams", outputCol="trigrams")

# Create bigram_df as a transform of unigram_df using NGram tranformer
production_df = ngram.transform(production_df)

# Display
production_df.printSchema()
production_df.show()

root
 |-- unigrams: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- bigrams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- trigrams: array (nullable = true)
 |    |-- element: string (containsNull = false)

+--------------------+--------------------+--------------------+
|            unigrams|             bigrams|            trigrams|
+--------------------+--------------------+--------------------+
|                  []|                  []|                  []|
|                  []|                  []|                  []|
|              [hope]|                  []|                  []|
|              [okay]|                  []|                  []|
|               [let]|                  []|                  []|
|               [wow]|                  []|                  []|
|[okay, your, gonn...|[okay your, your ...|[okay your gonna,...|
|                  []|                  []|                  []|
|[kid, know, some

In [10]:
from pyspark.sql.functions import monotonically_increasing_id


mv_line = mv_line.withColumn("id", monotonically_increasing_id())

production_df = production_df.withColumn("id", monotonically_increasing_id())

mv_line.show()

production_df.show()

mv_line = production_df.join(mv_line, "id", "outer").drop("id")

mv_line.show()

+------+-----------+-------+-------------+--------------------+---+
|lineID|characterID|movieID|characterName|               utter| id|
+------+-----------+-------+-------------+--------------------+---+
| L1045|         u0|     m0|       BIANCA|        They do not!|  0|
| L1044|         u2|     m0|      CAMERON|         They do to!|  1|
|  L985|         u0|     m0|       BIANCA|          I hope so.|  2|
|  L984|         u2|     m0|      CAMERON|           She okay?|  3|
|  L925|         u0|     m0|       BIANCA|           Let's go.|  4|
|  L924|         u2|     m0|      CAMERON|                 Wow|  5|
|  L872|         u0|     m0|       BIANCA|Okay -- you're go...|  6|
|  L871|         u2|     m0|      CAMERON|                  No|  7|
|  L870|         u0|     m0|       BIANCA|I'm kidding.  You...|  8|
|  L869|         u0|     m0|       BIANCA|Like my fear of w...|  9|
|  L868|         u2|     m0|      CAMERON|     The "real you".| 10|
|  L867|         u0|     m0|       BIANCA|    Wh

In [11]:
df_pd = mv_line.toPandas()
df_pd.to_csv('mv_line_final.csv', quoting=csv.QUOTE_NONNUMERIC, index = False, encoding = 'utf-8')
print ': WRITE PANDAS TO CSV DONE!'

: WRITE PANDAS TO CSV DONE!
