In [1]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Spark-Exercise") \
      .getOrCreate()

spark_context = spark.sparkContext

In [5]:
# parallelize method is used to create a RDD
data = [("Java", 1), ("Python", 2), ("Scala", 3)]
rdd = spark_context.parallelize(data)

In [16]:
# Transformations return a RDD, actions return values to driver
rdd = rdd.map(lambda x: (x[0], x[1]**2))
rdd.collect()

[('Java', 1), ('Python', 4), ('Scala', 9)]

In [8]:
spark.catalog.listTables()

[]

In [17]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]

df = spark.createDataFrame(data=data, schema=columns)

In [18]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [19]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [22]:
# We should create a temporary table to make sql queries to the data
df.createOrReplaceTempView("Person")

In [23]:
df2 = spark.sql("SELECT * FROM Person WHERE salary > 3000")
df2.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
+---------+----------+--------+----------+------+------+



In [24]:
df2.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [25]:
from pyspark import SparkConf

In [27]:
spark_conf = spark_context.getConf()
spark_conf.getAll()

[('spark.driver.host', '192.168.1.24'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.sql.warehouse.dir',
  'file:/home/tolga/Documents/my-repos/Studies/spark/spark-warehouse'),
 ('spark.submit.pyFiles', ''),
 ('spark.app.startTime', '1654968868487'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1654968869288'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[1]'),
 ('spark.driver.port', '45651'),
 ('spark.app.name', 'Spark-Exercise')]

In [29]:
spark_conf.set('spark.driver.memory', '2g')

[('spark.sql.warehouse.dir',
  'file:/home/tolga/Documents/my-repos/Studies/spark/spark-warehouse'),
 ('spark.executor.id', 'driver'),
 ('spark.app.startTime', '1654968868487'),
 ('spark.app.id', 'local-1654968869288'),
 ('spark.driver.port', '45651'),
 ('spark.driver.host', '192.168.1.24'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '2g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[1]'),
 ('spark.app.name', 'Spark-Exercise')]

In [30]:
# Some useful configs
"""spark.driver.cores
   spark.driver.memory
   spark.executor.memory
   spark.executor.cores
   spark.master
"""



'spark.driver.cores\n   spark.driver.memory\n   spark.executor.memory\n   spark.executor.cores\n   spark.master\n'

In [31]:
# ML with PySpark
spark.read.csv

<bound method DataFrameReader.csv of <pyspark.sql.readwriter.DataFrameReader object at 0x7efe710f8340>>

In [33]:
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
spark_context.addFile(url)
df = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)


                                                                                

In [34]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [39]:
# Make conversions
from pyspark.sql.types import FloatType

def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
df = convertColumn(df, CONTI_FEATURES, FloatType())

In [40]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [43]:
df.show(5)

+---+----+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  x| age|workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  1|25.0|  Private|226802.0|        11th|            7.0|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|
|  2|38.0|  Private| 89814.0|     HS-grad|            9.0|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|         0.0|         0.0|          50.0| United-States| <=50K|
|  3|28.0|Local-gov|336951.0|  Assoc-acdm|           12.0|Married-civ-spous

In [45]:
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="race", outputCol="new_race")
model = stringIndexer.fit(df)
df = model.transform(df)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- new_race: double (nullable = false)



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder

CAT_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'gender', 'native-country']

