In [17]:
import pyspark as ps
import warnings
import random
import os

from pyspark.ml.feature import MinMaxScaler, VectorAssembler

In [3]:
try:
    # we try to create a SparkContext to work locally on all cpus available
    sc = ps.SparkContext('local[4]')
    print("Just created a SparkContext")
except ValueError:
    # give a warning if SparkContext already exists (for use inside pyspark)
    warnings.warn("SparkContext already exists in this scope")


Just created a SparkContext


In [4]:
sc

In [17]:
n = 10000000
heads = (sc.parallelize(range(n))
    .map(lambda _: random.random())
    .filter(lambda r: r < 0.5)
    .count())
    
tails = n - heads
ratio = heads / n

print('heads =', heads)
print('tails =', tails)
print('ratio =', ratio)

heads = 4996900
tails = 5003100
ratio = 0.49969


In [18]:
def  is_prime(number):
    factor_min = 2
    factor_max = int(number ** 0.5) + 1
    for factor in range(factor_min, factor_max):
        if number % factor == 0:
            return False
    return True

In [19]:
numbers = range(2, 100)

primes = (sc.parallelize(numbers)
    .filter(is_prime)
    .collect())

print(primes)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]


In [5]:
link = 's3a://mortar-example-data/airline-data'
rdd = sc.textFile(link)

In [6]:
rdd.getNumPartitions()

11

In [7]:
rdd.count()

5113194

In [10]:
df = spark.read.csv('../data/referral/physician-shared-patient-patterns-2014-days180-sample.txt')

In [12]:
type(df)

pyspark.sql.dataframe.DataFrame

In [13]:
df.show()

+----------+----------+----------+----------+---+
|       _c0|       _c1|       _c2|       _c3|_c4|
+----------+----------+----------+----------+---+
|1063590222|1760575641|96        |27        |  3|
|1790184497|1427005735|68        |13        |  0|
|1154381309|1861495335|253       |20        | 15|
|1538144910|1801826219|40        |22        |  4|
|1134293798|1346341948|68        |24        |  6|
|1578678975|1720057714|540       |199       | 24|
|1518905249|1952385098|111       |12        |  9|
|1689976649|1982643235|67        |14        |  3|
|1851355804|1164748810|209       |37        |  0|
|1710971585|1821191149|475       |38        |  4|
|1992735088|1336186972|78        |42        |  0|
|1902072143|1386651297|179       |92        |  0|
|1700954252|1275531832|72        |11        |  0|
|1962509422|1689943334|297       |35        |  0|
|1750363016|1295789907|548       |65        |  0|
|1801891791|1215927538|50        |11        |  0|
|1881694222|1467494161|157       |18        |  0|


In [23]:
vectorAssembler = VectorAssembler(inputCols=['_c3'], outputCol='features')

In [25]:
# df_vector = vectorAssembler.transform(df)

In [26]:
spark

In [27]:
sc

In [37]:
file_path = '../../../Galvanize/G65DS/DSI_Lectures/spark-aws/moses_marsh/data/aapl.csv'

# read CSV# read  
df_aapl = spark.read.csv(file_path,
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

df_aapl.show(5)

+-------------------+----------+----------+----------+----------+--------+----------+
|               Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+-------------------+----------+----------+----------+----------+--------+----------+
|2016-10-25 00:00:00|117.949997|118.360001|117.309998|    118.25|39190300|    118.25|
|2016-10-24 00:00:00|117.099998|117.739998|     117.0|117.650002|23538700|117.650002|
|2016-10-21 00:00:00|116.809998|116.910004|116.279999|116.599998|23192700|116.599998|
|2016-10-20 00:00:00|116.860001|117.379997|116.330002|117.059998|24125800|117.059998|
|2016-10-19 00:00:00|    117.25|117.760002|113.800003|117.120003|20034600|117.120003|
+-------------------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows



In [38]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

# assemble values in a vector
vectorAssembler = VectorAssembler(inputCols=["Open","High", "Low","Close"],
                                  outputCol="features")

df_vector = vectorAssembler.transform(df_aapl)
df_vector.select(['Open', 'High', 'Low', 'Close', 'features']).show(5)

print("***"*25)

df_vector.select('features').show(5)

print("***"*25)

df_vector.select('features').take(5)

+----------+----------+----------+----------+--------------------+
|      Open|      High|       Low|     Close|            features|
+----------+----------+----------+----------+--------------------+
|117.949997|118.360001|117.309998|    118.25|[117.949997,118.3...|
|117.099998|117.739998|     117.0|117.650002|[117.099998,117.7...|
|116.809998|116.910004|116.279999|116.599998|[116.809998,116.9...|
|116.860001|117.379997|116.330002|117.059998|[116.860001,117.3...|
|    117.25|117.760002|113.800003|117.120003|[117.25,117.76000...|
+----------+----------+----------+----------+--------------------+
only showing top 5 rows

***************************************************************************
+--------------------+
|            features|
+--------------------+
|[117.949997,118.3...|
|[117.099998,117.7...|
|[116.809998,116.9...|
|[116.860001,117.3...|
|[117.25,117.76000...|
+--------------------+
only showing top 5 rows

****************************************************************

[Row(features=DenseVector([117.95, 118.36, 117.31, 118.25])),
 Row(features=DenseVector([117.1, 117.74, 117.0, 117.65])),
 Row(features=DenseVector([116.81, 116.91, 116.28, 116.6])),
 Row(features=DenseVector([116.86, 117.38, 116.33, 117.06])),
 Row(features=DenseVector([117.25, 117.76, 113.8, 117.12]))]

In [39]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledfeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(df_vector)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(df_vector)
scaledData.select("features", "scaledfeatures").show(5)

print("***"*25)

scaledData.select("scaledfeatures").take(5)

+--------------------+--------------------+
|            features|      scaledfeatures|
+--------------------+--------------------+
|[117.949997,118.3...|[0.84364622791846...|
|[117.099998,117.7...|[0.81798975110079...|
|[116.809998,116.9...|[0.80923635459429...|
|[116.860001,117.3...|[0.81074565144089...|
|[117.25,117.76000...|[0.82251743035171...|
+--------------------+--------------------+
only showing top 5 rows

***************************************************************************


[Row(scaledfeatures=DenseVector([0.8436, 0.8302, 0.8659, 0.866])),
 Row(scaledfeatures=DenseVector([0.818, 0.8109, 0.8563, 0.8473])),
 Row(scaledfeatures=DenseVector([0.8092, 0.7851, 0.8339, 0.8148])),
 Row(scaledfeatures=DenseVector([0.8107, 0.7997, 0.8355, 0.829])),
 Row(scaledfeatures=DenseVector([0.8225, 0.8115, 0.7568, 0.8309]))]

In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a a a b b c a d spark", 1.0),
    (1, "b c c c d c c a", 0.0),
    (2, "spark spark a a c spam", 1.0),
    (3, "c d d b d spam", 0.0)
], ["id", "text", "label"])

In [44]:
# Prepare test documents, which are unlabeled (id, text) tuples.# Prepar 
test = spark.createDataFrame([
    (4, "spark a a a a"),
    (5, "c c c p"),
    (6, "spark spam spark a"),
    (7, "a a a c c c")
], ["id", "text"])

# What do we need to do to this to get a prediction?

In [42]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.# Config 
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [45]:
#How can we test this against our training data?
prediction = model.transform(test)
prediction.select(['features', 'prediction', 'probability']).show()

+--------------------+----------+--------------------+
|            features|prediction|         probability|
+--------------------+----------+--------------------+
|(262144,[227410,2...|       1.0|[4.25735553078518...|
|(262144,[28698,21...|       0.0|[0.97099160578993...|
|(262144,[197793,2...|       1.0|[0.00333910522987...|
|(262144,[28698,22...|       1.0|[0.33293838014924...|
+--------------------+----------+--------------------+



In [46]:
1+1

2