### Python superscript

#### Table of content

    1 Spark Python
    1.1 Load modules/packages
    1.2 Spark connection setup
    1.3 RDD operations
    1.4 DF's and data-manipulation
    1.5 Transforming, extracting and selecting features
    1.6 Spark SQL
    1.7 Machine learning pipelines
    1.8 Correlation
    1.9 Logistic regression
    1.10 Recommendation system


#### 1.1 Load modules/packages



In [58]:
#Importing the neccesary modules/packages
from pyspark import SparkContext
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import FeatureHasher
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import NGram
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors




#### 1.2 Spark connection setup




In [42]:
# Import SparkContext from pyspark
from pyspark import SparkContext

In [43]:
# Create the sc
sc = SparkContext.getOrCreate()

#### 1.3 RDD operations

In [2]:
# Lets create a list with names and values 
data = sc.parallelize([('Jordan',100),('Jason',150),('Jack',200)])

In [5]:
# Create a data example 
data_example = sc.parallelize([('Jordan',100),('Jason',150),('Jack',200)]).collect()

In [6]:
# Printing the data structure of the RDD
data_example

[('Jordan', 100), ('Jason', 150), ('Jack', 200)]

In [7]:
# Returning 'Jack'
data_example[2]

('Jack', 200)

In [8]:
# Lets create a new list
data_second = [1,2,3,4,5,6,7,8,9,10]

In [9]:
# Lets create a distributed dataset
distData = sc.parallelize(data_second)

In [10]:
 # Reducing the elements in the list by adding them up
distData.reduce(lambda a,b:a+b)

55

#### 1.4 DF's and data-manipulation

In [5]:
# Starting and initiating the Spark session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()

In [4]:
# Read in and attach the CSV
df = spark.read.csv('creditcard.csv')

In [None]:
# Print the datatype
df.printSchema()

In [None]:
df.head

In [None]:
# Count the dataframe
df.count()

In [None]:
# Run summary statistics
df.describe().show()

In [None]:
# Dropping the na’s in the dataframe
df.dropna().count()

In [None]:
# Filling null values
df.fillna(0).show(5)

In [None]:
# Show top 5 rows
df.show(5)

#### 1.5 Transforming, extracting and selecting features

In [16]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# Creating a Spark dataframe with 3 sentences.
sentenceData = spark.createDataFrame([
    (0.0, "Sentence number 1"),
    (0.0, "Sentence number 2"),
    (1.0, "Sentence number 3")
], ["label", "sentence"])

# Splitting each of 3 sentences into words with Tokenizer. 
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

#Hasing the sentences ino a feature vector with HashingTF. 
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

#Rescaling the feature vectors with IDF to improve text feature performance. 
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)

# Passing the featur vectors to the learning algorithm.
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[3,4,12],[0.0...|
|  0.0|(20,[3,9,12],[0.0...|
|  1.0|(20,[3,12,18],[0....|
+-----+--------------------+



In [23]:
from pyspark.ml.feature import Word2Vec

# Creating a Sparkdataframe based on sequence of words(Can also be document text).
documentDF = spark.createDataFrame([
    ("This is sentence 1".split(" "), ),
    ("This is sentence 2".split(" "), ),
    ("This is sentence 3".split(" "), )
], ["text"])

# Learning a mapping from words to Vectors/Transforming each document into a feature vector.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

#Passing the feature vector to the learning algorithm.
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [This, is, sentence, 1] => 
Vector: [0.00933781731874,-0.0693989954889,-0.0287429555319]

Text: [This, is, sentence, 2] => 
Vector: [0.0329760909081,-0.00654029287398,-0.0481778117828]

Text: [This, is, sentence, 3] => 
Vector: [-0.0434353947639,-0.053800560534,-0.0430120560341]



In [25]:
from pyspark.ml.feature import CountVectorizer

# creating a Sparkdataframe with a bag of words per row + ID. 
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fitting a CountVectorizerModel 
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



In [29]:
from pyspark.ml.feature import FeatureHasher

# Creating a Sparkdataframe with 4 columns real, bool, stringNum, and string. 
dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

# Hashing the features to new output column: Features.
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)

+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features                                                |
+----+-----+---------+------+--------------------------------------------------------+
|2.2 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|
|3.3 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3])  |
|4.4 |false|3        |baz   |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0])  |
|5.5 |false|4        |foo   |(262144,[70644,101499,174475,257907],[1.0,1.0,5.5,1.0]) |
+----+-----+---------+------+--------------------------------------------------------+



In [34]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

#Creating the Sparkdataframe.
sentenceDataFrame = spark.createDataFrame([
    (0, "Sentence number 1"),
    (1, "Sentence number 2"),
    (2, "Sentence number 3")
], ["id", "sentence"])

# Tokenizing text(sentences) into words.
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------+---------------------+------+
|sentence         |words                |tokens|
+-----------------+---------------------+------+
|Sentence number 1|[sentence, number, 1]|3     |
|Sentence number 2|[sentence, number, 2]|3     |
|Sentence number 3|[sentence, number, 3]|3     |
+-----------------+---------------------+------+

+-----------------+---------------------+------+
|sentence         |words                |tokens|
+-----------------+---------------------+------+
|Sentence number 1|[sentence, number, 1]|3     |
|Sentence number 2|[sentence, number, 2]|3     |
|Sentence number 3|[sentence, number, 3]|3     |
+-----------------+---------------------+------+



In [39]:
from pyspark.ml.feature import StopWordsRemover

# Creating the Sparkdataframe
sentenceData = spark.createDataFrame([
    (0, ["I", "am", "working", "in", "Pyspark"]),
    (1, ["I", "am", "now", "applying", "a", "StopWordsRemover function", "from","pyspark.ml.feature"])
], ["id", "raw"])

# Removing stopwords
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+------------------------------------------------------------------------------+---------------------------------------------------------+
|id |raw                                                                           |filtered                                                 |
+---+------------------------------------------------------------------------------+---------------------------------------------------------+
|0  |[I, am, working, in, Pyspark]                                                 |[working, Pyspark]                                       |
|1  |[I, am, now, applying, a, StopWordsRemover function, from, pyspark.ml.feature]|[applying, StopWordsRemover function, pyspark.ml.feature]|
+---+------------------------------------------------------------------------------+---------------------------------------------------------+



In [45]:
from pyspark.ml.feature import NGram

# Creating the Spark dataframe
wordDataFrame = spark.createDataFrame([
    (0, ["I", "am", "working", "in", "Pyspark"]),
    (1, ["This","is","how", "to", "apply", "the", "NGram function", "from","the", "pyspark.ml.feature"]),
    (2, ["Ending", "it", "with", "this", "sentence"])
], ["id", "words"])

# Applying NGram = 2
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ngrams                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[I am working, am working in, working in Pyspark]                                                                                                            |
|[This is how, is how to, how to apply, to apply the, apply the NGram function, the NGram function from, NGram function from the, from the pyspark.ml.feature]|
|[Ending it with, it with this, with this sentence]                                                                                                           |
+---------------------------------------

In [50]:
from pyspark.ml.feature import Binarizer

# Creating Sparkdataframe
continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

# Thresholding the numerical features > 0.5 to 1 
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+



In [54]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

#Creating the Sparkdataframe
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

# Applying PCA and converting observartions into principal components
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# Result: 5 feature vectors reduced to 3 principal components.
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [58]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

# Creating Spark dataframe
df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

#Expanding the features into 3-degree polynomial space.
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+



In [62]:
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

# Creating Spark dataframe.
df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

# DCT Transforming a length N real-valued sequence from time domain to frequency domain. 
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)

+----------------------------------------------------------------+
|featuresDCT                                                     |
+----------------------------------------------------------------+
|[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
|[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677]  |
|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163]   |
+----------------------------------------------------------------+



In [63]:
from pyspark.ml.feature import StringIndexer

# Creating Sparkdataframe
df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

# StringIndex encoding string column of labels to a column of label indices.
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



In [66]:
from pyspark.ml.feature import IndexToString, StringIndexer

# Creating Spark dataframe
df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

# Mapping a column of label indices back to a column containing the original labels as strings
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer has stored labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+



In [67]:
from pyspark.ml.feature import OneHotEncoderEstimator

# Creating Sparkdataframe
df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

#One-hot encoding categorical features(label index) to binary vectors 
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+



In [72]:
from pyspark.ml.feature import VectorIndexer

# Reading in a dataset of labeled points
data = spark.read.format("libsvm").load("C:/big-datademo/superscripts/data/libsvm.txt")

# Applying vectorIndexer to decide which features should be treated as categorical
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Creating a new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()

Chose 351 categorical features: 645, 69, 365, 138, 101, 479, 333, 249, 0, 555, 666, 88, 170, 115, 276, 308, 5, 449, 120, 247, 614, 677, 202, 10, 56, 533, 142, 500, 340, 670, 174, 42, 417, 24, 37, 25, 257, 389, 52, 14, 504, 110, 587, 619, 196, 559, 638, 20, 421, 46, 93, 284, 228, 448, 57, 78, 29, 475, 164, 591, 646, 253, 106, 121, 84, 480, 147, 280, 61, 221, 396, 89, 133, 116, 1, 507, 312, 74, 307, 452, 6, 248, 60, 117, 678, 529, 85, 201, 220, 366, 534, 102, 334, 28, 38, 561, 392, 70, 424, 192, 21, 137, 165, 33, 92, 229, 252, 197, 361, 65, 97, 665, 583, 285, 224, 650, 615, 9, 53, 169, 593, 141, 610, 420, 109, 256, 225, 339, 77, 193, 669, 476, 642, 637, 590, 679, 96, 393, 647, 173, 13, 41, 503, 134, 73, 105, 2, 508, 311, 558, 674, 530, 586, 618, 166, 32, 34, 148, 45, 161, 279, 64, 689, 17, 149, 584, 562, 176, 423, 191, 22, 44, 59, 118, 281, 27, 641, 71, 391, 12, 445, 54, 313, 611, 144, 49, 335, 86, 672, 172, 113, 681, 219, 419, 81, 230, 362, 451, 76, 7, 39, 649, 98, 616, 477, 367, 535, 1

In [86]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

# Creating Sparkdataframe
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalizing each Vector 1
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector 2
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



In [88]:
from pyspark.ml.feature import StandardScaler

# Reading in text data into Sparkdataframe
dataFrame = spark.read.format("libsvm").load("C:/big-datademo/superscripts/data/libsvm.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Computing summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalizing each feature to have unit standard deviation
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


In [89]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

# Creating Spark dataframe
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Computing summary statistics and generating MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# Rescaling each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Features scaled to range: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



In [91]:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

# Creating dataframe
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Computing summary statistics and generating MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescaling each feature 
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+



In [93]:
from pyspark.ml.feature import Bucketizer

# Defining splits
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

# Creating Sparkdataframe
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

# Defining bucketizer 
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transforming original data into bucket index
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+



In [94]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Creating vector data
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transforming the vectors to create new column
transformer.transform(df).show()

+-------------+-----------------+
|       vector|transformedVector|
+-------------+-----------------+
|[1.0,2.0,3.0]|    [0.0,2.0,6.0]|
|[4.0,5.0,6.0]|   [0.0,5.0,12.0]|
+-------------+-----------------+



In [95]:
from pyspark.ml.feature import SQLTransformer

# Creating Spark dataframe
df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])

# SQL Transforming dataframe
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+



In [97]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Creating Sparkdataframe
dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

# Vectorassembling inputcolumns to outputcolumn: features
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [99]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

# Creating Spark dataframe
dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

# Vectorassembling inputcolumns to outputcolumn: features
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# Applying downstream transformers 
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Rows where 'userFeatures' is not the right size are filtered out
+---+----+------+--------------+-------+
|id |hour|mobile|userFeatures  |clicked|
+---+----+------+--------------+-------+
|0  |18  |1.0   |[0.0,10.0,0.5]|1.0    |
+---+----+------+--------------+-------+

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [101]:
from pyspark.ml.feature import QuantileDiscretizer

# Creating Sparkdataframe
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

# QuantileDiscretizing column with continuous features to output column with binned categorical features
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.2|   0.0|
+---+----+------+



In [103]:
from pyspark.ml.feature import Imputer

# Creating Sparkdataframe
df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

# Imputing missing values 
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+



In [106]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

# Creating Sparkdataframe
df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

# Transforming feature vector; UserFeatures and outputting a new feature vector with sub-array of the original features
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()

+--------------------+-------------+
|        userFeatures|     features|
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(1,[0],[2.3])|
|      [-2.0,2.3,0.0]|        [2.3]|
+--------------------+-------------+



In [109]:
from pyspark.ml.feature import RFormula

# Create Sparkdataframe
dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

# Selecting the columns specified by the Rformula
formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

# Outputting the results
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

+--------------+-----+
|      features|label|
+--------------+-----+
|[0.0,0.0,18.0]|  1.0|
|[1.0,0.0,12.0]|  0.0|
|[0.0,1.0,15.0]|  0.0|
+--------------+-----+



In [111]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

# Creating the Sparkdataframe
df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

# Chi-Squared feature selection on cateorgical labeled data with categorical features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)

# Printing the result
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()

ChiSqSelector output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+



In [114]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Creating Sparkdataframe 1
dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

# Creating Sparkdataframe 2
dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Transforming features
print("The hashed dataset. Hashed values stored in the column 'hashes':")
model.transform(dfA).show()

# Computing the locality sensitive hashes for the input rows, then performing join
print("Approximately joining dfA and dfB on EuclideanDistance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Computing the locality sensitive hashes for the input rows, then performing nearest neighbor search.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset. Hashed values stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[-1.0], [0.0], [...|
|  1| [1.0,-1.0]|[[-1.0], [0.0], [...|
|  2|[-1.0,-1.0]|[[0.0], [-1.0], [...|
|  3| [-1.0,1.0]|[[0.0], [-1.0], [...|
+---+-----------+--------------------+

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  3|  5|              1.0|
|  0|  4|              1.0|
|  0|  6|              1.0|
|  2|  7|              1.0|
|  1|  7|              1.0|
|  3|  6|              1.0|
|  2|  5|              1.0|
|  1|  4|              1.0|
+---+---+-----------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.

In [116]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Creating Sparkdataframe 1
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

# Creating Sparkdataframe 2
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Transforming features
print("The hashed dataset. Hashed values stored in the column 'hashes':")
model.transform(dfA).show()

# Computing the locality sensitive hashes for the input rows, then performing join
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Computing the locality sensitive hashes for the input rows, then performing nearest neighbor search.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[-9.26256698E8],...|
|  1|(6,[2,3,4],[1.0,1...|[[-1.373153181E9]...|
|  2|(6,[0,2,4],[1.0,1...|[[-1.373153181E9]...|
+---+--------------------+--------------------+

Approximately joining dfA and dfB on distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  1|  4|            0.5|
|  1|  5|            0.5|
|  0|  5|            0.5|
|  2|  5|            0.5|
+---+---+---------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+-------+
| id|            features|              hashes|distCol|
+---+--------------------+--------------------+-------+
|  0|(6,[0,1,2],[1.0,1...|[[-9.26256698E8],...|   0.75|
|  1|(6,[2,3,4],[1.0,1...|[[

#### 1.6 Spark SQL

In [50]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [55]:
df = spark.read.format("csv").option("header", "true").load("people.csv")
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [61]:
# Printing the schema in tree format
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



In [62]:
# Selecting only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [63]:
# Selecting everybody, incrementing Age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     41.0|
|   Andy|     31.0|
| Justin|     24.0|
+-------+---------+



In [67]:
# Selecting people older than 23
df.filter(df['age'] > 23).show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
+-------+---+



In [68]:
# Counting people by age
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 30|    1|
| 23|    1|
| 40|    1|
+---+-----+



In [70]:
# DataFrame as temporary view
df.createOrReplaceTempView("people")

In [71]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [72]:
# Registering the DataFrame as global temporary view
df.createGlobalTempView("people")

In [73]:
# Global temporary view tied to database global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [74]:
# Global temporary view cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [110]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

In [111]:
# Loading a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

In [114]:
# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [115]:
# Applying schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

In [116]:
# Creating a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

#Run SQL over DataFrames that are registered as tables.
results = spark.sql("SELECT name FROM people")


In [118]:
results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [227]:
# Loading and saving parquet
df = spark.read.load("C:/big-datademo/superscripts/data/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

In [127]:
# Loading and saving json
df = spark.read.load("C:/big-datademo/superscripts/data/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

In [128]:
# Loading and saving CSV
df = spark.read.load("C:/big-datademo/superscripts/data/people.csv",
                     format="csv", sep=":", inferSchema="true", header="true")

In [129]:
# Running SQL on files directly
df = spark.sql("SELECT * FROM parquet.`C:/big-datademo/superscripts/data/users.parquet`")

In [138]:
# Bucketing
df.write.bucketBy(42, "name").sortBy("favorite_numbers").saveAsTable("people_bucketed")

In [141]:
# Partitioning
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

In [142]:
# Partitioning and bucketing on a single table
df = spark.read.parquet("C:/big-datademo/superscripts/data/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("people_partitioned_bucketed"))

In [148]:
peopleDF = spark.read.json("C:/big-datademo/superscripts/data/people.json")

# Save dataframes as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Reading in the Parquet file created above, preserving the schema. Result=Dataframe
parquetFile = spark.read.parquet("people.parquet")

# Using parquet files to create a temporary view and then apply SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
Adults = spark.sql("SELECT name FROM parquetFile WHERE age >= 23 AND age <= 60")
Adults.show()

+----+
|name|
+----+
|Andy|
+----+



In [154]:
from pyspark.sql import Row

# Creating a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Creating another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Reading the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

root
 |-- double: long (nullable = true)
 |-- single: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)



In [161]:

sc = spark.sparkContext

# Pointing to a json dataset by path.
path = "C:/big-datademo/superscripts/data/people.json"
peopleDF = spark.read.json(path)

# Visualizing the inferred schema
peopleDF.printSchema()

# Creating a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# Running SQL statements by using sql methods provided by spark
AdultNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 23 AND 60")
AdultNamesDF.show()

# Alternatively, creating a dataframe for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+
|name|
+----+
|Andy|
+----+

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



In [209]:
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Pointing warehouse location to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark as existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'kv1.csv' INTO TABLE src")

# Queries in HiveQL
spark.sql("SELECT * FROM src").show()


+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



In [210]:
# Aggregation queries 
spark.sql("SELECT COUNT(*) FROM src").show()

+--------+
|count(1)|
+--------+
|    3000|
+--------+



In [211]:
# The dataFrames as result of SQL queries all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames of type Row, accessing each column by ordinal
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)

Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: None
Key: 0, Value: None
Key: 0, Value: None
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 2, Value: val_2
Key: 2, Value: None
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 4, Value: val_4
Key: 4, Value: None
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: None
Key: 5, Value: None
Key: 5, Value: None
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5


In [212]:
# Using DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Joining DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

+---+-----+---+-----+
|key|value|key|value|
+---+-----+---+-----+
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2| null|
|  2|val_2|  2|val_2|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4| null|
|  4|val_4|  4|val_4|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
+---+-----+---+-----+
only showing top 20 rows



#### 1.7 Machine learning Pipelines

In [15]:
#Importing the neccesary modules/packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession                            

In [18]:
# Building the session 
if __name__ == "__main__":
   spark = SparkSession\
       .builder\
       .appName("PipelineExample")\
.getOrCreate()

In [35]:
# Preparing the training documents from a list of id, text and label tuples
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])                            


In [36]:
 # Configuring an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])                            

In [37]:
# Fitting the pipeline to the training dataframe
model = pipeline.fit(training)                            

In [38]:
# Preparing the test dataframe
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

In [39]:
# Prediction on the test set and making predictions
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
     rid, text, prob, prediction = row
     print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.159640773879,0.840359226121], prediction=1.000000
(5, l m n) --> prob=[0.837832568548,0.162167431452], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.0692663313298,0.93073366867], prediction=1.000000
(7, apache hadoop) --> prob=[0.982157533344,0.0178424666556], prediction=0.000000


#### 1.8 Correlation


In [20]:
#Importing the neccesary modules/packages
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession

In [21]:
# Creating the sc
sc = SparkContext.getOrCreate()

In [22]:
# Building the session
if __name__ == "__main__":
     spark = SparkSession \
          .builder \
          .appName("CorrelationExample") \
.getOrCreate()

In [23]:
 # create the dataframe
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
            (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
            (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
             (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])

In [24]:
# Producing the Pearson correlation matrix
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.05564149,         nan,  0.40047142],
             [ 0.05564149,  1.        ,         nan,  0.91359586],
             [        nan,         nan,  1.        ,         nan],
             [ 0.40047142,  0.91359586,         nan,  1.        ]])


In [25]:
# Producing the Spearman correlation matrix
r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Spearman correlation matrix:
DenseMatrix([[ 1.        ,  0.10540926,         nan,  0.4       ],
             [ 0.10540926,  1.        ,         nan,  0.9486833 ],
             [        nan,         nan,  1.        ,         nan],
             [ 0.4       ,  0.9486833 ,         nan,  1.        ]])


#### 1.9 Logistic regression

In [25]:
# Importing the neccesary modules/packages
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession

In [51]:
# Building a new session
if __name__ == "__main__":
     spark = SparkSession\
          .builder\
          .appName("LogisticRegressionWithElasticNet")\
          .getOrCreate()

In [52]:
# Loading the training data
training = spark.read.format("libsvm").load("libsvm.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [53]:
# Fit the model
lrModel = lr.fit(training)

In [54]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.35398352419e-05,-9.10273850559e-05,-0.000194674305469,-0.000203006424735,-3.14761833149e-05,-6.84297760266e-05,1.58836268982e-05,1.40234970914e-05,0.00035432047525,0.000114432728982,0.000100167123837,0.00060141093038,0.000284024817912,-0.000115410847365,0.000385996886313,0.000635019557424,-0.000115064123846,-0.00015271865865,0.000280493380899,0.000607011747119,-0.000200845966325,-0.000142107557929,0.000273901034116,0.00027730456245,-9.83802702727e-05,-0.000380852244352,-0.000253151980086,0.000277477147708,-0.000244361976392,-0.00153947446876,-0.000230733284113])
Intercept: 0.22456315961250325


In [55]:
# Multinominal method for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

In [56]:
# Fitting the model
mlrModel = mlr.fit(training)

In [57]:
# Printing the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Multinomial coefficients: 2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
Multinomial intercepts: [-0.120658794459,0.120658794459]


In [58]:
from pyspark.ml.classification import LogisticRegression

# Extract the summary from the LogisticRegressionModel trained earlier
trainingSummary = lrModel.summary

In [59]:
# Obtaining the objective per iteration and printing the result
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
     print(objective)

objectiveHistory:
0.6833149135741672
0.6662875751473734
0.6217068546034618
0.6127265245887887
0.6060347986802873
0.6031750687571562
0.5969621534836274
0.5940743031983118
0.5906089243339022
0.5894724576491042
0.5882187775729587


In [60]:
 # Printing the receiver-operating characteristic as a dataframe and the areaUnderROC
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 1.0


In [61]:
 # Setting the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
          .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

LogisticRegression_4c878e704f2ac6767c63

In [62]:
 # Printing the f measure results
fMeasure.show()

+------------------+--------------------+
|         threshold|           F-Measure|
+------------------+--------------------+
|0.7845860015371142|0.034482758620689655|
|0.7843193344168922| 0.06779661016949151|
|0.7842976092510131|                 0.1|
|0.7842531051133191| 0.13114754098360656|
|0.7835792429453297| 0.16129032258064516|
|0.7835223585829078|  0.1904761904761905|
| 0.783284563364102|             0.21875|
|0.7832449070254992| 0.24615384615384614|
|0.7830630257264691|  0.2727272727272727|
|0.7830068256743365| 0.29850746268656714|
|0.7822341175907138|  0.3235294117647059|
| 0.782111826902122| 0.34782608695652173|
| 0.781220790993743|  0.3714285714285714|
|0.7802700864854707|  0.3943661971830986|
|0.7789683616171501|  0.4166666666666667|
|0.7789606764592472|  0.4383561643835616|
|0.7788060694625324| 0.45945945945945943|
|0.7783754276111222|  0.4799999999999999|
|0.7771658291080574|                 0.5|
|0.7769914303593917|  0.5194805194805194|
+------------------+--------------

In [63]:
 # Printing the max FMeasure
maxFMeasure

Row(max(F-Measure)=1.0)

In [64]:
 # Printing the best Threshold
bestThreshold

0.5585022394278357

#### 1.10 Reccomendation systems

In [73]:
# Importing the neccesary modules/packages
from __future__ import print_function
import sys
if sys.version >= '3':long = int
    
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [74]:
 # Building a new session
if __name__ == "__main__":
     spark = SparkSession\
          .builder\
.appName("ALSExample")\
.getOrCreate()

In [75]:
 # Creating the dataframe
rdd = sc.textFile('ratings2.csv')
rdd = rdd.map(lambda line: line.split(","))

from pyspark.sql import Row

df= rdd.map(lambda line: Row(userId=line[0], 
movieId=line[1],
rating=line[2], 
timestamp=line[3])).toDF()

In [76]:
 # Show the dataframe
df.show(5)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|      2|   3.5|1112486027|     1|
|     29|   3.5|1112484676|     1|
|     32|   3.5|1112484819|     1|
|     47|   3.5|1112484727|     1|
|     50|   3.5|1112484580|     1|
+-------+------+----------+------+
only showing top 5 rows



In [77]:
 # Casting the features/variables to integers
from pyspark.sql.types import IntegerType
df=df.withColumn("userId", df["userId"].cast(IntegerType()))
df=df.withColumn("movieId", df["movieId"].cast(IntegerType()))
df=df.withColumn("rating", df["rating"].cast(IntegerType()))
df=df.withColumn("timestamp", df["timestamp"].cast(IntegerType()))

In [78]:
 # Splitting the train and test in 80/20
(training, test) = df.randomSplit([0.8, 0.2])

In [80]:
# Building the recommendation model using ALS on the train data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
                      coldStartStrategy="drop")
model = als.fit(training)

In [81]:
# Evaluate the model by printing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                                   predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9332826538500137


In [82]:
# Generating top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generating top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [83]:
#Printing the user recommendations
userRecs.show()    

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[7241, 17.19252]...|
|  4900|[[4282, 10.177976...|
|  5300|[[7058, 6.7370977...|
|  6620|[[67665, 8.95527]...|
|   471|[[96911, 8.252678...|
|  1591|[[43744, 10.89861...|
|  4101|[[6813, 7.4541287...|
|  1342|[[4282, 13.613807...|
|  2122|[[6339, 10.289984...|
|  2142|[[5174, 11.00116]...|
|   463|[[67665, 9.871851...|
|   833|[[72714, 13.74275...|
|  5803|[[32525, 8.633098...|
|  3794|[[8527, 13.211122...|
|  6654|[[3847, 11.462468...|
|  1645|[[75341, 9.10404]...|
|  3175|[[93270, 13.23678...|
|  4935|[[72714, 9.80378]...|
|   496|[[1529, 7.6244574...|
|  2366|[[72714, 12.70508...|
+------+--------------------+
only showing top 20 rows



In [84]:
#Printing the movie recommendations
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[4747, 6.1958365...|
|   4900|[[2559, 12.581707...|
|   5300|[[6322, 14.011656...|
|   6620|[[5361, 7.4421635...|
|   7240|[[3220, 9.055042]...|
|   7340|[[4747, 11.626975...|
|   7880|[[992, 5.989811],...|
|  32460|[[2940, 13.113006...|
|  54190|[[1247, 8.309697]...|
|  57370|[[1773, 5.8368864...|
|    471|[[3159, 7.8786597...|
|   1591|[[1229, 7.117619]...|
|   4101|[[6240, 12.982417...|
|  80451|[[3159, 3.1412215...|
|   1342|[[999, 9.297892],...|
|   2122|[[2559, 9.091963]...|
|   2142|[[5361, 8.768823]...|
|   7982|[[4143, 8.795311]...|
|  33722|[[1773, 9.470557]...|
|  44022|[[6427, 8.61894],...|
+-------+--------------------+
only showing top 20 rows

