In [49]:
import findspark
findspark.init()
findspark.find()
import warnings
warnings.filterwarnings("ignore")
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark
from operator import add
from pyspark.sql import SQLContext
from pyspark import SparkFiles
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import * 
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import DenseVector

#### Create a Dataframe
Here I create a simple data frame for a demonstration on how to create dataframes in PySpark

In [2]:
#Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# or an example: sc = SparkContext("local", "first app")
# Data
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

# Columns
columns = ["language","users_count"]

# Create DataFrame
df = spark.createDataFrame(data).toDF(*columns)

# Print DataFrame
df.show()
spark.stop()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 11:21:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



#### Parallelize
The parallelize method allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data.

In [3]:
sc = SparkContext("local", "first app") 

In [4]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
counts

                                                                                

8

In [5]:
words.collect()

['scala',
 'java',
 'hadoop',
 'spark',
 'akka',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark']

In [6]:
def printWords(x):
    print(x)

In [7]:
fore = words.foreach(printWords) 

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark


In [8]:
words_filter = words.filter(lambda x: 'spark' in x)

In [9]:
words_filter

PythonRDD[3] at RDD at PythonRDD.scala:53

In [10]:
filtered = words_filter.collect()

In [11]:
filtered

['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

In [12]:
words_map = words.map(lambda x: (x, 1))
words_map

PythonRDD[4] at RDD at PythonRDD.scala:53

In [13]:
mapping = words_map.collect()

In [14]:
mapping

[('scala', 1),
 ('java', 1),
 ('hadoop', 1),
 ('spark', 1),
 ('akka', 1),
 ('spark vs hadoop', 1),
 ('pyspark', 1),
 ('pyspark and spark', 1)]

In [15]:
nums = sc.parallelize([1, 2, 3, 4, 5])
nums

ParallelCollectionRDD[5] at readRDDFromFile at PythonRDD.scala:274

In [16]:
adding = nums.reduce(add)

In [17]:
adding

15

In [18]:
x = sc.parallelize([('spark', 1), ('hadoop', 4)])
y = sc.parallelize([('spark', 2), ('hadoop', 5)])
joined = x.join(y)
final = joined.collect()

                                                                                

In [19]:
final

[('hadoop', (4, 5)), ('spark', (1, 2))]

In [20]:
words.cache()

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [21]:
caching = words.persist().is_cached

In [22]:
caching

True

## Broadcast variables are used to save the copy of data across all nodes. 
##### This variable is cached on all the machines and not sent on machines with tasks. 
##### The following code block has the details of a Broadcast class for PySpark.
```
class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)
```

#####  A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value.

In [23]:
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 

In [24]:
words_new.value

['scala', 'java', 'hadoop', 'spark', 'akka']

In [25]:
words_new.value[2] 

'hadoop'

The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator's value, but usable only in a driver program.

In this example, an accumulator variable is used by multiple workers and returns an accumulated value.

In [26]:
num = sc.accumulator(10)
num

Accumulator<id=0, value=10>

In [27]:
def f(x):
    global num #need to specify num
    num +=x 
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)

In [28]:
final = num.value

In [29]:
final

150

StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions.

In [30]:
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())

Disk Memory Serialized 2x Replicated


In [42]:
sc.stop()

In [47]:
sc = SparkContext()
nums = sc.parallelize([4,3,2,1])
nums.take(2)

                                                                                

[4, 3]

In [48]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))

16 
9 
4 
1 


In [51]:
sqlContext = SQLContext(sc)
lp = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(lp)

In [53]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [55]:
df = sqlContext.createDataFrame(ppl)

In [57]:
df.show()

+-----+---+
| name|age|
+-----+---+
| John| 19|
|Smith| 29|
| Adam| 35|
|Henry| 50|
+-----+---+



In [58]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [138]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc.addFile(url)
sqlContext = SQLContext(sc)

22/04/26 14:22:49 WARN SparkContext: The path https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv has been added already. Overwriting of added paths is not supported in the current version.


In [139]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

                                                                                

In [63]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [66]:
df.show(5, truncate=False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [69]:
df_string = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= False)
#example: all string - must convert
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [75]:

def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())

In [77]:
df_string.printSchema() # Now the number strings have been converted to float

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [78]:
df.select('x', 'age').show()

+---+---+
|  x|age|
+---+---+
|  1| 25|
|  2| 38|
|  3| 28|
|  4| 44|
|  5| 18|
|  6| 34|
|  7| 29|
|  8| 63|
|  9| 24|
| 10| 55|
| 11| 65|
| 12| 36|
| 13| 26|
| 14| 58|
| 15| 48|
| 16| 43|
| 17| 20|
| 18| 43|
| 19| 37|
| 20| 40|
+---+---+
only showing top 20 rows



In [79]:
#number of observations in each education group sorted by the count in ascending order
df.groupby('education').count().sort('count', ascending=True).show()



+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



                                                                                

In [80]:
df.describe().show()

[Stage 20:>                                                         (0 + 1) / 1]

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

                                                                                

In [82]:
df.describe('capital-gain').show()



+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|                 0|
|    max|             99999|
+-------+------------------+



                                                                                

#### Crosstab computation

In [85]:
df.crosstab('age', 'workclass').sort('age_workclass').show()

+-------------+---+-----------+---------+------------+-------+------------+----------------+---------+-----------+
|age_workclass|  ?|Federal-gov|Local-gov|Never-worked|Private|Self-emp-inc|Self-emp-not-inc|State-gov|Without-pay|
+-------------+---+-----------+---------+------------+-------+------------+----------------+---------+-----------+
|           17| 97|          2|       21|           2|    454|           8|               9|        2|          0|
|           18|154|          5|       15|           4|    638|          12|              20|       14|          0|
|           19|183|          6|       18|           0|    784|           6|              24|       29|          3|
|           20|184|         13|       20|           2|    834|          11|              16|       33|          0|
|           21|147|          4|       22|           0|    859|           4|              15|       44|          1|
|           22|128|         16|       34|           0|    924|           9|     

#### More filtering and GroupBy Practice

In [91]:
df.filter(df.age>40).count()

                                                                                

20211

In [95]:
df.filter(df.age<=40).count()

28631

In [97]:
df.groupby('age').count().sort('count', ascending=False).show() #only shows top 20 rows

+---+-----+
|age|count|
+---+-----+
| 36| 1348|
| 35| 1337|
| 33| 1335|
| 23| 1329|
| 31| 1325|
| 34| 1303|
| 28| 1280|
| 37| 1280|
| 30| 1278|
| 38| 1264|
| 32| 1253|
| 41| 1235|
| 27| 1232|
| 29| 1223|
| 24| 1206|
| 39| 1206|
| 25| 1195|
| 40| 1187|
| 22| 1178|
| 42| 1165|
+---+-----+
only showing top 20 rows



In [99]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()



+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



                                                                                

In [103]:

age_square = df.select(col('age')**2)
age_square.show()

+-------------+
|POWER(age, 2)|
+-------------+
|        625.0|
|       1444.0|
|        784.0|
|       1936.0|
|        324.0|
|       1156.0|
|        841.0|
|       3969.0|
|        576.0|
|       3025.0|
|       4225.0|
|       1296.0|
|        676.0|
|       3364.0|
|       2304.0|
|       1849.0|
|        400.0|
|       1849.0|
|       1369.0|
|       1600.0|
+-------------+
only showing top 20 rows



In [105]:
df = df.withColumn('age-square', col('age')**2)

In [109]:
df.printSchema() #age-square is now at the end of the dataframe

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age-square: double (nullable = true)



In [110]:
df.first()

Row(x='1', age='25', workclass='Private', fnlwgt='226802', education='11th', educational-num='7', marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain='0', capital-loss='0', hours-per-week='40', native-country='United-States', income='<=50K', age-square=625.0)

#### Build data processing pipeline

In [124]:
indexer = StringIndexer(inputCol='workclass', outputCol='workclass_encoded')
model = indexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol='workclass_vec')
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded.show(2)

                                                                                

+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  x|age|workclass|fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|age-square|workclass_encoded|workclass_vec|
+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  1| 25|  Private|226802|     11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|     625.0|              0.0|(9,[0],[1.0])|
|  2| 38|  Private| 89814|  HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Ma

In [132]:
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = []
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()], outputCols=[categoricalCol + 'classVec'])
    stages += [stringIndexer, encoder]
stages

[StringIndexer_bbcbaf9fe1e8,
 OneHotEncoder_f98fee62dfeb,
 StringIndexer_f157372b71fa,
 OneHotEncoder_9fedb6cd0798,
 StringIndexer_0f1d2d5a480e,
 OneHotEncoder_040a5d70f973,
 StringIndexer_ec76f2936067,
 OneHotEncoder_fbccd51a378f,
 StringIndexer_dea1f7df56d4,
 OneHotEncoder_880ab465f68d,
 StringIndexer_f0538640e59c,
 OneHotEncoder_bcea3f54ed37,
 StringIndexer_5c488a23f81d,
 OneHotEncoder_d0be1d80efb2,
 StringIndexer_cbe062c13dbf,
 OneHotEncoder_63fdd55aa9fe]

In [133]:
assemblerInputs = [c + 'classVec' for c in CATE_FEATURES] + CONTI_FEATURES

In [134]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

In [140]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [141]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

In [144]:
model.take(1)

[Row(x=1, age=25, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(41, {0: 1.0}), features=SparseVector(100, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 94: 25.0, 95: 226802.0, 97: 7.0, 99: 40.0}))]

In [145]:
input_data = model.rdd.map(lambda x: (x['newlabel'], DenseVector(x['features'])))

In [146]:
input_data

PythonRDD[392] at RDD at PythonRDD.scala:53

#### User defined functions

In [150]:
def cube(s):  
    return s*s*s  
spark.udf.register("cubewithPython", cube)  

<function __main__.cube(s)>