In [1]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
    .master('local[4]') \
    .appName('spark-lecture') \
    .getOrCreate()
    
sc = spark.sparkContext

ModuleNotFoundError: No module named 'pyspark'

In [0]:
import random

n=100  #n times

heads = (sc.parallelize(xrange(n))
        .map(lambda _: random.random())
        .filter(lambda r: r<0.5)
        .count())
        
# we must always parallelize --> xrange(n) is because we want to parallelize to n
# _: from every number that came from the above line; we map each number to random.random() using lambda function
# we just want to filter things lower than 0.5

tails = n - heads
ratio = 1. * heads /n

print("heads: ", heads)
print("tails: ", tails)
print("ratio: ", ratio)


In [0]:
def is_prime(number):
    factor_min = 2
    factor_max = int(number**0.5)+1
    for factor in xrange(factor_min, factor_max):
        if number % factor == 0:
            return False
    return True

#defines if a number is prime or not

In [0]:
numbers = xrange(2,100)

primes = (sc.parallelize(numbers)
        .filter(is_prime)  # filter with the function
        .collect())

print(primes)


#or
# rdd = sc.parallelize(numbers)
# rdd2 = rdd.filter(is_prime)
# rdd3 = rdd2.filter(lambda x: x>5)
# rdd3.collect()  --> collects them 


In [0]:
sc.parallelize([1,3,2,3,2,4,5]).distinct().collect()         # outputs [1,2,3,4,5] --> distinct numbers

In [0]:
sc.parallelize([1,3,2,3,2]).sortBy(lambda x: x, ascending=False).collect()         #outputs [3,3,2,2,1]

In [0]:
## MAP AND FLAT MAP

In [0]:
%%writefile input.txt         #file name
hello world
another line
yet another line
yet another another line

In [0]:
rdd = sc.textFile('input.txt')         # parallelizes
rdd.collect()                          # outputs by line

#but if we do ...
rdd2 = rdd.map(lambda x: x.split())           # splits elements by space, it is a transformation
rdd2.collect()                                # outputs words --> a list of lists: every sentence is a list, composed with each word

In [0]:
rdd3 = rdd.flatMap(lambda x: x.split())
rdd3.collect()                                # outputs words but not as a list of lists (only a list)

In [0]:
%%writefile sales.txt
# writes something here

In [0]:
sc.textFile('sales.txt').top(2)               # gets the top 2 rows excluding header
sc.textFile('sales.txt').take(2)              # gets the top 2 rows counting header (1: the header, 2: the first row) --> output is a list of 2 rows - it should be split

sc.textFile('sales.txt'). \
    map(lambda x: x.split()). \
    take(2)

In [0]:
# to remove the first 2 rows which do not start with «, e.g.
sc.textFile('sales.txt'). \
    map(lambda x: x.split()). \
    filter(lambda x: x not x[0].startswith('«')). \
    take(2)
    
# if we did not write "not" we would get the 2 or less rows which start with «

In [0]:
# to get the 2 first rows of the last column
sc.textFile('sales.txt').\
    map(lambda x: x.split()).\
    filter(lambda x: x not x[0].startswith('«')).\
    map(lambda x: x[-1]).\
    take(2)                                                 #outputs a list 

In [0]:
# to sum the 2 first rows of the last column
sc.textFile('sales.txt').\
    map(lambda x: x.split()).\
    filter(lambda x: x not x[0].startswith('«')).\
    map(lambda x: float(x[-1])).\                           # converts to float since the list gives something as u'element'
    take(2)                                                 # outputs a list 

In [0]:
sc.textFile('sales.txt').\
    map(lambda x: x.split()).\
    filter(lambda x: x not x[0].startswith('«')).\
    map(lambda x: (x[3], float(x[-1]))).\                           # two columns
    collect()                                                       # outputs a list of tuples [()]

In [0]:
sc.textFile('sales.txt').\
    map(lambda x: x.split()).\
    filter(lambda x: x not x[0].startswith('«')).\
    map(lambda x: (x[3], float(x[-1]))).\                           # two columns
    reduceByKey(lambda amount1, amount2: amount1+amount2).\         # the input is a tuple, we group the results by amount 1 (the key) and amount2 has a transformation (sums all elements of the same key after grouping)
    collect()                                                       # outputs a list of tuples [()]

In [0]:
sc.textFile('sales.txt').\
    map(lambda x: x.split()).\
    filter(lambda x: x not x[0].startswith('«')).\
    map(lambda x: (x[3], float(x[-1]))).\                           # two columns
    reduceByKey(lambda amount1, amount2: amount1+amount2).\         # the input is a tuple, we group the results by amount 1 (the key) and amount2 has a transformation (sums all elements of the same key after grouping)
    sortBy(lambda state_amount: state_amount[1], ascending = False).\   # sorts by the second column in a descending order
    collect()

In [0]:
## SPARK-ML --> chain transformers and estimators together to compose ML pipelines

In [0]:
import pyspark.sql.functions as F
import pyspark as ps
from pyspark import SQLContext

In [0]:
spark = ps.sql.SparkSession.builder.\
          .master('local[2]').\
          .appName('spark-ml').\
          .getOrCreate()
            
sc = spark.sparkContext

sqlContext = SQLContext(sc)               # wrap the SQLContext in a spark context to collect the SQL Context to use the SQL functions

In [0]:
df_aapl = sqlContext.read.csv('data/aapl.csv',
                   header = True,                   #use header or not
                   quote = '"',                     #char for quotes
                   sep = ',',                       #char for separation
                   inferSchema = True)              #do we infer Schema or not?

df_aapl.show(5)                                     # similar to df.head(5), but shows the headers

In [0]:
df_aapl.schema      #similar to df.info() --> tells the data types

In [0]:
df_out = df_aapl.select('Date', 'Close').orderBy('Close', ascending=False)       # uses SQL commands, selects the 'Date' and 'Close' columns, grouping them by 'Close' in a descending order
df_out.show(5)

In [0]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

# assemble values in a vector --> VectorAssembler is a transformer
vectorAssembler = VectorAssembler (inputCols = ['Close'], outputCols='Features')           # forms a new column with the values of 'Close' as a vector (list)

df_vector = vectorAssembler.transform(df_aapl)        
df_aapl.show(5)                                       # shows the original table
df_vector.show(5)                                     # shows the modified table (with the new column)

In [0]:
scaler = MinMaxScaler(inputCol = "Features", outputCol = "Scaled Features")                # forms a new column with the standardized values of "Features"

#compute summary statistics and generate MinMaxScalerModel
scaler_model = scaler.fit(df_vector)

#rescale each feature to range[min, max]
scalar_data = scaler.model.transform(df_vector)
scalar_data.select("Features", "Scaled Features").show(5)

In [0]:
## TRANSFORMERS, ESTIMATORS, PIPELINES

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression               #estimator
from pyspark.ml.feature import RegexTokenizer, HashingTF

In [0]:
# Prepare training documents from a list of (id, text, label) tuples

training= spark.createDataFrame([
        (0, "spark is like hadoop mapreduce", 1.0),
        (1, "sparks light fire!!!", 0.0),
        (2, "elephants like simba", 0.0),
        (3, "hadoop is an elephant", 1.0),
        (4, "hadoop mapreduce", 1.0)
    ], ['id', 'text', 'label'])

In [0]:
regexTokenizer = RegexTokenizer(inputCol = 'text', outputCol = "tokens", pattern ="\\W")   # it is a transformer; \\W splits by widespace
hashingTF = HashingTF(inputCol = "tokens", outputCol = "features")
lr = LogisticRegression(maxIter = 10, regParam = 0.001)

tokens = regexTokenizer.transform(training)         # transform the original df to give us the tokenized words
hashes = hashingTF.transform(tokens)
logistic_model = lr.fit(hashes)               # uses columns named features / label by default

In [0]:
# Prepare test documents, which are unlabeled (id, text) tuples
test= spark.createDataFrame([
        (5, "simba has a spark"),
        (6, "hadoop"),
        (7, "mapreduce in spark"),
        (8, "apache hadoop")
    ], ['id', 'text'])

# What do we need to do to this to make a prediction?
preds = logistic_model.transform(hashingTF.transform(regexTokenizer.transform(test)))
preds.select('text', 'prediction', 'probability').show()

In [0]:
# cols=list(df.columns)
#I = [2, 5, 7, 8, 10, 14]
#new_cols = np.delete(cols, I).tolist()
#df = df[new_cols]