Few words about Spark

* **SparkContext** = connection to the cluster
* **SparkSession** = interface with that connection

* **RDD** = Resilient Distributed DataFrames. Low level, more complicated to work with
* **Spark df** = easier to understand and work with 


# Spark outside Proximus

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)

In [None]:
# Print the tables in the catalog
print(spark.catalog.listTables())

In [None]:
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()

pandas_df = flights10.toPandas()

In [None]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

In [None]:
# Don't change this file path
file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

# Show the data
airports.show()

In [None]:
# Create the DataFrame flights
flights = spark.table("flights")

# Show the head
print(flights.show())

# Add duration_hrs
flights = flights.withColumn("duration_hrs", flights.air_time/60)

long_flights1 = flights.filter("distance > 1000")

# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Standard deviation
by_month_dest.agg(F.stddev("dep_delay")).show()

# Spark context initialisation

In [42]:
#kinit command in Python
from subprocess import Popen, PIPE
import getpass
userid=getpass.getuser()
password = getpass.getpass()
kinit = '/usr/bin/kinit'
kinit_args = [ kinit, '%s' % (userid) ]
kinit_args = [ kinit ]
kinit = Popen(kinit_args, stdin=PIPE, stdout=PIPE, stderr=PIPE,universal_newlines=True)
kinit.stdin.write('%s\n' % password)
#kinit.wait()
out,err = kinit.communicate()

········


In [43]:
from bdpcommon.runspark import Runspark
from functools import reduce

config = {'spark.executor.instances' : 8,
            'spark.executor.memory' : '2g',
            'spark.driver.memory' : '2g',
            'spark.executor.cores' : '4',
          'spark.app.name':'TESTID860112',
          'spark.yarn.queue': 'DEV',
          'spark.port.maxRetries': 100}
    
    
#rp = Runspark(cluster_name="dev-el3207", conf = config , spark_version="2.3.0")


# rp = Runspark(cluster_name="prod-datalake", conf = config ,spark_version="1.6.3") 
rp = Runspark(cluster_name="prod-datalake", conf = config ,spark_version="2.3.0") 


 @       Welcome to  Proximus      __     __  @
 @    ____              __        /  \   /  \ @
 @   / __/__  ___ _____/ /__      \___|_|___/ @
 @  _\ \/ _ \/ _ `/ __/  '_/       ___|_|___  @
 @ /__ / .__/\_,_/_/ /_/\_\       /   | |   \ @
 @    /_/                         \__/   \__/ @
      version 2.3.0.2.6.5.0-292

Using Python version 2.7.15 (default, Nov 29 2018 06:43:57)
Spark session available as <runspark_object>.spark,
SparkContext available as <runspark_object>.sc,
SQLContext available as <runspark_object>.sqlCtx.


In [44]:
sc = rp.sc

from pyspark.sql import SQLContext

sqlContext = SQLContext(rp.sc)
hiveContext = rp.sqlContext 

In [45]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, isnull

from pyspark.sql import DataFrame

# Spark 

## RDD

In [6]:
sc.version

u'2.3.0.2.6.5.0-292'

In [7]:
sc.pythonVer

'2.7'

In [8]:
sc.master

u'yarn-client'

In [65]:
data = [1,2,5,8,10,11]
rdd = sc.parallelize(data)
type(rdd)

pyspark.rdd.RDD

In [55]:
print(rdd.collect())
print(rdd.take(2))
print(rdd.first())
print(rdd.count())

[1, 2, 5, 8, 10, 11]
[1, 2]
1
6


In [56]:
rdd.map(lambda x : x+2).collect()

[3, 4, 7, 10, 12, 13]

In [57]:
rdd.getNumPartitions()

32

In [58]:
fileRDD = sc.textFile("Data/text.txt")
# fileRDD.getNumPartitions()

In [75]:
print(map(lambda x: x**2, data))
print(rdd.map(lambda x:x**2).collect())

[1, 4, 25, 64, 100, 121]
[1, 4, 25, 64, 100, 121]


In [74]:
print(filter(lambda x: x%2 == 0, data))
print(rdd.filter(lambda x:x%2==0).collect())

[2, 8, 10]
[2, 8, 10]


In [76]:
print(reduce(lambda x,y: x+y, data))
print(rdd.reduce(lambda x,y: x+y))

37
37


In [80]:
rdd = sc.parallelize(["Hello world how are you", "Ben"])
rdd.flatMap(lambda x:x.split()).collect()

['Hello', 'world', 'how', 'are', 'you', 'Ben']

## Pair RDD

In [88]:
x = [("ben",29), ("noumi", 23), ("ben", 40)]
paired_rdd = sc.parallelize(x)
print(paired_rdd.collect())
print(paired_rdd.reduceByKey(lambda x,y : x + y).collect())

[('ben', 29), ('noumi', 23), ('ben', 40)]
[('ben', 69), ('noumi', 23)]


In [92]:
paired_rdd.reduceByKey(lambda x,y : x + y)\
    .map(lambda x: (x[1], x[0]))\
    .sortByKey(ascending=False)\
    .collect()

[(69, 'ben'), (23, 'noumi')]

In [93]:
RDD1 = sc.parallelize([("Messi", 34),("Ronaldo", 32),("Neymar", 24)])
RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])

RDD1.join(RDD2).collect()

[('Neymar', (24, 120)), ('Ronaldo', (32, 80)), ('Messi', (34, 100))]

In [96]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
for kee, val in rdd.countByKey().items():
    print(kee, val)

('a', 2)
('b', 1)


In [97]:
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()

{1: 2, 3: 4}

## Spark df

### Dataframe API

In [101]:
iphones_RDD = sc.parallelize([
    ("XS", 2018, 5.65, 2.79, 6.24),
    ("XR", 2018, 5.94, 2.98, 6.84),
    ("X10", 2017, 5.65, 2.79, 6.13),
    ("8Plus", 2017, 6.23, 3.07, 7.12)
    ])

names = [ 'Model',
    'Year',
    'Height',
    'Width',
    'Weight'
    ]

iphones_df = sqlContext.createDataFrame(iphones_RDD, schema=names)

print(type(iphones_df))
print(iphones_df.collect())
print(iphones_df.show())

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(Model=u'XS', Year=2018, Height=5.65, Width=2.79, Weight=6.24), Row(Model=u'XR', Year=2018, Height=5.94, Width=2.98, Weight=6.84), Row(Model=u'X10', Year=2017, Height=5.65, Width=2.79, Weight=6.13), Row(Model=u'8Plus', Year=2017, Height=6.23, Width=3.07, Weight=7.12)]
+-----+----+------+-----+------+
|Model|Year|Height|Width|Weight|
+-----+----+------+-----+------+
|   XS|2018|  5.65| 2.79|  6.24|
|   XR|2018|  5.94| 2.98|  6.84|
|  X10|2017|  5.65| 2.79|  6.13|
|8Plus|2017|  6.23| 3.07|  7.12|
+-----+----+------+-----+------+

None


In [108]:
df_csv = sqlContext.read.csv("Playground/Data/data.csv", header=True, inferSchema=True)

# error because it looks on the datalake, not current directory

AnalysisException: u'Path does not exist: hdfs://DATALAKEHA/user/id860112/Playground/Data/data.csv;'

In [110]:
import pandas as pd

df = pd.read_csv("Data/data.csv")

df["Type 2"].fillna(" ", inplace=True)
df["Type"] = df["Type 1"] + "-" + df["Type 2"]
df.drop(["Type 1", "Type 2"], axis=1, inplace=True)
df.head()

Unnamed: 0,num,Name,Total,HP,Attack,Defense,Sp. Atk,Sp. Def,Speed,Generation,Legendary,Type
0,1,Bulbasaur,318,45,49,49,65,65,45,1,False,Grass-Poison
1,2,Ivysaur,405,60,62,63,80,80,60,1,False,Grass-Poison
2,3,Venusaur,525,80,82,83,100,100,80,1,False,Grass-Poison
3,3,VenusaurMega Venusaur,625,80,100,123,122,120,80,1,False,Grass-Poison
4,4,Charmander,309,39,52,43,60,50,65,1,False,Fire-


In [111]:
sdf = sqlContext.createDataFrame(df)
sdf.printSchema()

root
 |-- num: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Total: long (nullable = true)
 |-- HP: long (nullable = true)
 |-- Attack: long (nullable = true)
 |-- Defense: long (nullable = true)
 |-- Sp. Atk: long (nullable = true)
 |-- Sp. Def: long (nullable = true)
 |-- Speed: long (nullable = true)
 |-- Generation: long (nullable = true)
 |-- Legendary: boolean (nullable = true)
 |-- Type: string (nullable = true)



In [114]:
sdf = sdf.withColumnRenamed("Sp. Atk", "Spec_att")
sdf = sdf.withColumnRenamed("Sp. Def", "Spec_def")

In [115]:
sdf.show(5)

+---+--------------------+-----+---+------+-------+--------+--------+-----+----------+---------+------------+
|num|                Name|Total| HP|Attack|Defense|Spec_att|Spec_def|Speed|Generation|Legendary|        Type|
+---+--------------------+-----+---+------+-------+--------+--------+-----+----------+---------+------------+
|  1|           Bulbasaur|  318| 45|    49|     49|      65|      65|   45|         1|    false|Grass-Poison|
|  2|             Ivysaur|  405| 60|    62|     63|      80|      80|   60|         1|    false|Grass-Poison|
|  3|            Venusaur|  525| 80|    82|     83|     100|     100|   80|         1|    false|Grass-Poison|
|  3|VenusaurMega Venu...|  625| 80|   100|    123|     122|     120|   80|         1|    false|Grass-Poison|
|  4|          Charmander|  309| 39|    52|     43|      60|      50|   65|         1|    false|      Fire- |
+---+--------------------+-----+---+------+-------+--------+--------+-----+----------+---------+------------+
only showi

In [116]:
sdf.columns

['num',
 'Name',
 'Total',
 'HP',
 'Attack',
 'Defense',
 'Spec_att',
 'Spec_def',
 'Speed',
 'Generation',
 'Legendary',
 'Type']

In [118]:
sdf.describe().show()

+-------+------------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+------------------+------------------+------------------+-----------+
|summary|               num|            Name|           Total|               HP|           Attack|          Defense|        Spec_att|          Spec_def|             Speed|        Generation|       Type|
+-------+------------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+------------------+------------------+------------------+-----------+
|  count|               800|             800|             800|              800|              800|              800|             800|               800|               800|               800|        800|
|   mean|         362.81375|            null|        435.1025|         69.25875|         79.00125|          73.8425|           72.82|           71.9025|           68.2775|           3.3237

In [8]:
sdf.select('Speed').show(5)

+-----+
|Speed|
+-----+
|   45|
|   60|
|   80|
|   80|
|   65|
+-----+
only showing top 5 rows



In [9]:
sdf.filter(sdf['Attack'] < 80).show(5)

+----------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+
|      Name|Total| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|        Type|
+----------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+
| Bulbasaur|  318| 45|    49|     49|     65|     65|   45|         1|    false|Grass-Poison|
|   Ivysaur|  405| 60|    62|     63|     80|     80|   60|         1|    false|Grass-Poison|
|Charmander|  309| 39|    52|     43|     60|     50|   65|         1|    false|      Fire- |
|Charmeleon|  405| 58|    64|     58|     80|     65|   80|         1|    false|      Fire- |
|  Squirtle|  314| 44|    48|     65|     50|     64|   43|         1|    false|     Water- |
+----------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+
only showing top 5 rows



In [24]:
sdf.withColumn('new col', sdf['Total'] / 100).show(6)

+--------------------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+-------+
|                Name|Total| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|        Type|new col|
+--------------------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+-------+
|           Bulbasaur|  318| 45|    49|     49|     65|     65|   45|         1|    false|Grass-Poison|   3.18|
|             Ivysaur|  405| 60|    62|     63|     80|     80|   60|         1|    false|Grass-Poison|   4.05|
|            Venusaur|  525| 80|    82|     83|    100|    100|   80|         1|    false|Grass-Poison|   5.25|
|VenusaurMega Venu...|  625| 80|   100|    123|    122|    120|   80|         1|    false|Grass-Poison|   6.25|
|          Charmander|  309| 39|    52|     43|     60|     50|   65|         1|    false|      Fire- |   3.09|
|          Charmeleon|  405| 58|    64|     58|     80|     65|   80|         1|    false|      Fire- | 

In [113]:
sdf.dropDuplicates()\
    .groupby(['Legendary'])\
    .agg({"Total": "AVG"})\
    .sort("avg(Total)", ascending=False)\
    .show()

+---------+------------------+
|Legendary|        avg(Total)|
+---------+------------------+
|     true| 637.3846153846154|
|    false|417.21360544217686|
+---------+------------------+



In [17]:
sdf2 = sdf.withColumn("Att_def", sdf.Attack + sdf.Defense)
sdf2 = sdf2.withColumn("is_strong", sdf2.Total > 550)
sdf2.show(5)

+---+--------------------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+-------+---------+
|num|                Name|Total| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|        Type|Att_def|is_strong|
+---+--------------------+-----+---+------+-------+-------+-------+-----+----------+---------+------------+-------+---------+
|  1|           Bulbasaur|  318| 45|    49|     49|     65|     65|   45|         1|    false|Grass-Poison|     98|    false|
|  2|             Ivysaur|  405| 60|    62|     63|     80|     80|   60|         1|    false|Grass-Poison|    125|    false|
|  3|            Venusaur|  525| 80|    82|     83|    100|    100|   80|         1|    false|Grass-Poison|    165|    false|
|  3|VenusaurMega Venu...|  625| 80|   100|    123|    122|    120|   80|         1|    false|Grass-Poison|    223|     true|
|  4|          Charmander|  309| 39|    52|     43|     60|     50|   65|         1|    false|      Fire- |     95|   

### SQL API

In [129]:
sdf.createOrReplaceTempView("table1")

query = sqlContext.sql("select Generation, avg(Total) from table1 group by 1 order by 1")
query.show()

+----------+------------------+
|Generation|        avg(Total)|
+----------+------------------+
|         1|426.81325301204816|
|         2| 418.2830188679245|
|         3|           436.225|
|         4| 459.0165289256198|
|         5| 434.9878787878788|
|         6| 436.3780487804878|
+----------+------------------+



## Visualization

In [142]:
sdf.select("Total").toPandas().plot(kind="density")

ImportError: No module named matplotlib.pyplot

In [146]:
hdf = sdf.toHandy() # need to install HandySpark
hdf.cols["Total"].hist()

AttributeError: 'DataFrame' object has no attribute 'toHandy'

## ML Example

In [32]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

sdf = sdf.withColumn("label", sdf.Legendary.cast("integer")) # mandatory to have a label column as integer for ML

type_indexer = StringIndexer(inputCol="Type", outputCol="type_index")

type_encoder = OneHotEncoder(inputCol = "type_index", outputCol = "type_fact")

vec_assembler = VectorAssembler(inputCols=["Total", "HP", "Attack", "Defense", "Speed", "type_fact"], outputCol="features")

pipeline = Pipeline(stages=[type_indexer, type_encoder, vec_assembler])

piped_data = pipeline.fit(sdf).transform(sdf)

In [33]:
from pyspark.ml.classification import LogisticRegression

training, test = piped_data.randomSplit([0.7,0.3])

lr = LogisticRegression()

In [34]:
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
import numpy as np

evaluator = evals.BinaryClassificationEvaluator(metricName = "areaUnderROC")

grid = tune.ParamGridBuilder()

grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

grid = grid.build()

cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

best_lr = lr.fit(training)

print(best_lr)

LogisticRegression_414c809cb6f00af4348a


In [40]:
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.890950792327


## MLlib

In [147]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.clustering import KMeans

In [151]:
from pyspark.mllib.recommendation import Rating

r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()

[Rating(user=1, product=1, rating=1.0),
 Rating(user=1, product=2, rating=2.0),
 Rating(user=2, product=1, rating=2.0)]

In [162]:
model = ALS.train(ratings, rank=10, iterations=10)

unrated_RDD = sc.parallelize([(1, 2), (1, 1)])

predictions = model.predictAll(unrated_RDD)
predictions.collect()

preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 136.0 failed 4 times, most recent failure: Lost task 16.3 in stage 136.0 (TID 3651, el8372.bc, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/px/devs/miniconda/envs/spark_2.3.0/share/spark-2.3.0/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 145, in load_stream
    yield self._read_with_length(stream)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/mllib/__init__.py", line 28, in <module>
ImportError: No module named numpy

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor168.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/px/devs/miniconda/envs/spark_2.3.0/share/spark-2.3.0/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 145, in load_stream
    yield self._read_with_length(stream)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
  File "/hadoopfs11/hadoop/yarn/local/usercache/id860112/appcache/application_1580124036614_25546/container_e118_1580124036614_25546_01_000008/pyspark.zip/pyspark/mllib/__init__.py", line 28, in <module>
ImportError: No module named numpy

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [171]:
from pyspark.mllib.feature import HashingTF
sentence = "hello hello world"
words = sentence.split()
tf = HashingTF(numFeatures=100)
words_transformed = tf.transform(words)

words_samples = words_transformed.map(lambda x:LabeledPoint(1, x))

AttributeError: 'SparseVector' object has no attribute 'map'

In [161]:
data = [
LabeledPoint(0.0, [0.0, 1.0]),
LabeledPoint(1.0, [1.0, 0.0]),
]
RDD = sc.parallelize(data)

model = LogisticRegressionWithLBFGS.train(RDD)

lrm.predict([1.0, 0.0])

NameError: name 'LabeledPoint' is not defined

## Spark tables & SDL

In [19]:
sdf.registerTempTable("data")

# SQL statements can be run by using the sql method
x = sqlContext.sql("SELECT Name, Total FROM data WHERE Generation >= 4 AND Generation <= 7 and Defense > 100")
x.show()

+--------------------+-----+
|                Name|Total|
+--------------------+-----+
|            Torterra|  525|
|            Shieldon|  350|
|           Bastiodon|  495|
| WormadamSandy Cloak|  424|
|           Vespiquen|  474|
|            Bronzong|  500|
|           Spiritomb|  485|
|GarchompMega Garc...|  700|
|           Hippowdon|  525|
|             Drapion|  500|
|AbomasnowMega Abo...|  594|
|           Magnezone|  535|
|           Rhyperior|  535|
|           Tangrowth|  535|
|             Leafeon|  525|
|             Glaceon|  525|
|             Gliscor|  510|
|           Probopass|  525|
|            Dusknoir|  525|
|     RotomHeat Rotom|  520|
+--------------------+-----+
only showing top 20 rows



# Close session

In [172]:
rp.sc.stop()