In [4]:
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.14165616


In [5]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName("test").getOrCreate()

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())


tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of one stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.334518607597,0.665481392403], prediction=1.000000
(5, l m n) --> prob=[0.668102477307,0.331897522693], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.111525601523,0.888474398477], prediction=1.000000
(7, apache hadoop) --> prob=[0.668102477307,0.331897522693], prediction=0.000000


In [14]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

df = spark.read.load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/attributes.csv", format="csv")

df.show()

+------+--------------------+--------------------+
|   _c0|                 _c1|                 _c2|
+------+--------------------+--------------------+
|100001|            Bullet01|Versatile connect...|
|100001|            Bullet02|Stronger than ang...|
|100001|            Bullet03|Help ensure joint...|
|100001|            Bullet04|Dimensions: 3 in....|
|100001|            Bullet05|Made from 12-Gaug...|
|100001|            Bullet06|Galvanized for ex...|
|100001|            Bullet07|Install with 10d ...|
|100001|               Gauge|                  12|
|100001|            Material|    Galvanized Steel|
|100001|      MFG Brand Name|  Simpson Strong-Tie|
|100001|    Number of Pieces|                   1|
|100001| Product Depth (in.)|                 1.5|
|100001|Product Height (in.)|                   3|
|100001|Product Weight (lb.)|                0.26|
|100001| Product Width (in.)|                   3|
|100002|  Application Method|  Brush,Roller,Spray|
|100002|Assembled Depth (...|  

In [83]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType,StringType,IntegerType

sc = spark.sparkContext
sql_sc = SQLContext(sc)

trainSchema = StructType([
    StructField("id", IntegerType()),
    StructField("pid", IntegerType()),
    StructField("title", StringType()),
    StructField("term", StringType()),
    StructField("score", DoubleType())
])

titleSchema = StructType([
    StructField("pid", IntegerType()),
    StructField("title", StringType())
])

descriptionSchema = StructType([
    StructField("pid", IntegerType()),
    StructField("description", StringType())
])

attrSchema = StructType([
    StructField("pid", IntegerType()),
    StructField("name", StringType()),
    StructField("value", StringType()),
])

title = sql_sc.read.format("com.databricks.spark.csv").option("header","true").schema(trainSchema).load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/train.csv")
train = sql_sc.read.format("com.databricks.spark.csv").option("header","true").schema(titleSchema).load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/RawTrain.csv")
test=sql_sc.read.format("com.databricks.spark.csv").option("header","true")    .schema(dataSchema).load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/test.csv")
attr = sql_sc.read.format("com.databricks.spark.csv").option("header","true").schema(attrSchema).load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/attributes.csv")

description = sql_sc.read.format("com.databricks.spark.csv").option("header","true").schema(descriptionSchema).load("/Users/yiwang/Documents/YiWang/Ebiz/Task 15/product_descriptions.csv")
title= title.drop(title.id)
title.show(10)
description.show(10)


attr.createOrReplaceTempView("attr")
#get brand, color and material
brand = sql_sc.sql("SELECT pid,value as brand from attr where name = 'MFG Brand Name'")
material = sql_sc.sql("SELECT pid,value as material from attr where name = 'Material'")
color = sql_sc.sql("SELECT pid,value as color from attr where name = 'Color Family'")

#result=train.union(test)
title=title.join(description, title.pid == description.pid, "left").drop(description.pid)
title=title.join(brand, title.pid == brand.pid, "left").drop(brand.pid)
title=title.join(material, title.pid== material.pid,"left").drop(material.pid)
title=title.join(color, title.pid == color.pid,"left").drop(color.pid)

title.show(10)


+------+--------------------+--------------------+-----+
|   pid|               title|                term|score|
+------+--------------------+--------------------+-----+
|100001|Simpson Strong-Ti...|       angle bracket|  3.0|
|100001|Simpson Strong-Ti...|           l bracket|  2.5|
|100002|BEHR Premium Text...|           deck over|  3.0|
|100005|Delta Vero 1-Hand...|    rain shower head| 2.33|
|100005|Delta Vero 1-Hand...|  shower only faucet| 2.67|
|100006|Whirlpool 1.9 cu....|      convection otr|  3.0|
|100006|Whirlpool 1.9 cu....|microwave over stove| 2.67|
|100006|Whirlpool 1.9 cu....|          microwaves|  3.0|
|100007|Lithonia Lighting...|     emergency light| 2.67|
|100009|House of Fara 3/4...|             mdf 3/4|  3.0|
+------+--------------------+--------------------+-----+
only showing top 10 rows

+------+--------------------+
|   pid|         description|
+------+--------------------+
|100002|BEHR Premium Text...|
|100003|Classic architect...|
|100004|The Grape Solar 2.

In [84]:
from pyspark.sql.functions import col, when
title=title.withColumn(
    "color", when(col("color").isNull(), "empty").otherwise(col("color")))
title=title.withColumn(
    "brand", when(col("brand").isNull(), "empty").otherwise(col("brand")))
title=title.withColumn(
    "material", when(col("material").isNull(), "empty").otherwise(col("material")))
title=title.withColumn(
    "description", when(col("description").isNull(), "empty").otherwise(col("description")))


title.show(10)

+------+--------------------+--------------------+-----+--------------------+------------+--------------+-----------------+
|   pid|               title|                term|score|         description|       brand|      material|            color|
+------+--------------------+--------------------+-----+--------------------+------------+--------------+-----------------+
|100170|Dyna-Glo Pro 125,...|     kerosene heater|  1.0|Dyna-Glo Pro port...|Dyna-Glo Pro|         Steel|Oranges / Peaches|
|100170|Dyna-Glo Pro 125,...|      lp gas heaters| 2.67|Dyna-Glo Pro port...|Dyna-Glo Pro|         Steel|Oranges / Peaches|
|100170|Dyna-Glo Pro 125,...|   portable air tank| 1.67|Dyna-Glo Pro port...|Dyna-Glo Pro|         Steel|Oranges / Peaches|
|100170|Dyna-Glo Pro 125,...| thin propane heater| 2.33|Dyna-Glo Pro port...|Dyna-Glo Pro|         Steel|Oranges / Peaches|
|100274|Milwaukee M12 12-...|        milwakee M12|  3.0|Milwaukee REDLITH...|   Milwaukee|         empty|              Red|
|100274|

In [85]:
from pyspark.sql import functions as sf
title = title.withColumn('joined_column', 
                    sf.concat( sf.col('description'),sf.lit('_'), sf.col('title'),sf.lit('_'), sf.col('brand'), sf.lit('_'), sf.col('material'),sf.lit('_'), sf.col('color')))
title = title.drop(title.title).drop(title.description).drop(title.brand).drop(title.material).drop(title.color)
title.show(10)

+------+--------------------+-----+--------------------+
|   pid|                term|score|       joined_column|
+------+--------------------+-----+--------------------+
|100170|     kerosene heater|  1.0|Dyna-Glo Pro port...|
|100170|      lp gas heaters| 2.67|Dyna-Glo Pro port...|
|100170|   portable air tank| 1.67|Dyna-Glo Pro port...|
|100170| thin propane heater| 2.33|Dyna-Glo Pro port...|
|100274|        milwakee M12|  3.0|Milwaukee REDLITH...|
|100274|       milwaukee m12| 1.67|Milwaukee REDLITH...|
|100274|photoelectric/ion...| 1.67|Milwaukee REDLITH...|
|100446|  glaciar bay toiled| 2.67|Choose a half flu...|
|100446|glacier bay high ...| 1.67|Choose a half flu...|
|100446|  toilet glacier bay|  3.0|Choose a half flu...|
+------+--------------------+-----+--------------------+
only showing top 10 rows



In [93]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
#tokenize terms
tokenizer = Tokenizer(inputCol="term", outputCol="term_words")
temp = tokenizer.transform(title)

hashingTF = HashingTF(inputCol="term_words", outputCol="rawFeatures", numFeatures=10)
temp = hashingTF.transform(temp)

idf = IDF(inputCol = "rawFeatures", outputCol="term_idf")
idfModel = idf.fit(temp)
temp = idfModel.transform(temp)
temp=temp.drop("rawFeatures")

#tokenize joined_column
tokenizer = Tokenizer(inputCol="joined_column", outputCol="joined_words")
temp = tokenizer.transform(temp)

hashingTF = HashingTF(inputCol="joined_words", outputCol="rawFeatures", numFeatures=10)
temp = hashingTF.transform(temp)

idf = IDF(inputCol = "rawFeatures", outputCol="joined_idf")
idfModel = idf.fit(temp)
temp = idfModel.transform(temp)
temp=temp.drop("rawFeatures")

temp.show(5)


+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   pid|                term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|
+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|100170|     kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170|      lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170|   portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170| thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, propane, h...|(10,[0,2,7],[1.12...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100274|     

In [128]:
result = temp
result.show(5)

+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   pid|               term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|
+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|100170|    kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170|     lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170|  portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100170|thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, propane, h...|(10,[0,2,7],[1.12...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|
|100274|       milwa

In [129]:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

#match words and then proceed to jaccard_similarity_score
def countMatchedWords(joined, term):
    l1=len(joined)
    l2=len(term)
    match = 0
    for i in range(l1):
        for j in range(l2):
            if joined[i] == term[j]:
                match+=2
            elif joined[i] in term[j]:
                match+=1
            elif term[j] in joined[i]:
                match+=1
        return match
matchUDF=udf(countMatchedWords, IntegerType())

result=result.withColumn("match", matchUDF("joined_words", "term_words"))
result.show(5)

+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|   pid|               term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|match|
+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|100170|    kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|     lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|  portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, propane, h...|(10,[0,2,7],[1.12...|[dyna-glo, pro, p...|(10,[0

In [130]:
from sklearn.metrics import jaccard_similarity_score
def jaccardSimilarity(term, joined):
    jscore = 0
    jscore =jaccard_similarity_score(term, joined)
    print (jscore)
    return jscore
jaccardUDF=udf(jaccardSimilarity, DoubleType())

#result = result.withColumn("term_joined_jaccard", jaccardUDF("term_idf", "joined_idf"))
result.show(5)

print (123)

+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|   pid|               term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|match|
+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|100170|    kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|     lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|  portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|
|100170|thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, propane, h...|(10,[0,2,7],[1.12...|[dyna-glo, pro, p...|(10,[0

In [131]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer,StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType,StringType,IntegerType
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler


In [132]:
frame = result
#features=["match", "term_joined_jaccard"]
features=["match"]
assembler_features = VectorAssembler(inputCols=features, outputCol="features")
data = assembler_features.transform(frame)
data.show(10)

+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------+
|   pid|                term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|match|features|
+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------+
|100170|     kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170|      lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170|   portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170| thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, p

In [121]:
data.select("features").show(20, truncate=False)

+--------+
|features|
+--------+
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[2.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
|[0.0]   |
+--------+
only showing top 20 rows



In [136]:
data.registerTempTable("data")
trainData = sql_sc.sql("SELECT * from data where score is not NULL")
trainData.show(5)
testData = sql_sc.sql("SELECT * from data where score is NULL")
testData.show(5)
trainData.select("pid", "features").show(10, truncate=False)
testData.select("pid", "features").show(10, truncate=False)

+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------+
|   pid|               term|score|       joined_column|          term_words|            term_idf|        joined_words|          joined_idf|match|features|
+------+-------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------+
|100170|    kerosene heater|  1.0|Dyna-Glo Pro port...|  [kerosene, heater]|(10,[0,8],[1.1296...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170|     lp gas heaters| 2.67|Dyna-Glo Pro port...|  [lp, gas, heaters]|(10,[2,5,9],[1.26...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170|  portable air tank| 1.67|Dyna-Glo Pro port...|[portable, air, t...|(10,[3,4,7],[1.25...|[dyna-glo, pro, p...|(10,[0,1,2,3,4,5,...|    0|   [0.0]|
|100170|thin propane heater| 2.33|Dyna-Glo Pro port...|[thin, propane,

In [137]:
(trainD, validD) = trainData.randomSplit([0.8,0.2])
rf = RandomForestRegressor(featuresCol="features", labelCol="score", numTrees=11, maxDepth=5)
pipeline = Pipeline(stages=[rf])
model=pipeline.fit(trainD)
predictions = model.transform(validD)