In [1]:
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

21/09/14 17:53:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
rdd = session.sparkContext.parallelize([1,2,3])

In [3]:
rdd.take(num=2)

                                                                                

[1, 2]

In [4]:
rdd.count()

3

In [5]:
rdd.collect()

[1, 2, 3]

In [6]:
df = session.createDataFrame([[1,2,3],[4,5,6]], ['Column1','Column2','Column3'])

In [7]:
df.show(n=3)

+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|      1|      2|      3|
|      4|      5|      6|
+-------+-------+-------+



In [8]:
import pyspark.sql.functions as funcs
import pyspark.sql.types as types

def multiply_by_ten(number):
    return number*10.0

multiply_udf = funcs.udf(multiply_by_ten, types.DoubleType())

In [9]:
transformed_df = df.withColumn('multiplied', multiply_udf('Column1'))

In [10]:
transformed_df.show()

+-------+-------+-------+----------+
|Column1|Column2|Column3|multiplied|
+-------+-------+-------+----------+
|      1|      2|      3|      10.0|
|      4|      5|      6|      40.0|
+-------+-------+-------+----------+



In [15]:
import math

In [16]:
def take_log_in_all_columns(row: types.Row):
    old_row = row.asDict()
    new_row = {f'log({column_name})': math.log(value)
               for column_name, value in old_row.items()}
    return types.Row(**new_row)

In [17]:
logarithmic_dataframe = df.rdd.map(take_log_in_all_columns).toDF()

In [19]:
logarithmic_dataframe.show()

+------------------+------------------+------------------+
|      log(Column1)|      log(Column2)|      log(Column3)|
+------------------+------------------+------------------+
|               0.0|0.6931471805599453|1.0986122886681098|
|1.3862943611198906|1.6094379124341003| 1.791759469228055|
+------------------+------------------+------------------+



In [20]:
df.select('Column1', 'Column2')

DataFrame[Column1: bigint, Column2: bigint]

In [23]:
df.where('Column1 = 3')

DataFrame[Column1: bigint, Column2: bigint, Column3: bigint]

In [31]:
test = df.join(transformed_df, ['Column1'], how='inner')

In [32]:
test.show()

                                                                                

+-------+-------+-------+-------+-------+----------+
|Column1|Column2|Column3|Column2|Column3|multiplied|
+-------+-------+-------+-------+-------+----------+
|      1|      2|      3|      2|      3|      10.0|
|      4|      5|      6|      5|      6|      40.0|
+-------+-------+-------+-------+-------+----------+



In [34]:
test = test.drop('Column2','Column3')

In [36]:
test.show()

                                                                                

+-------+----------+
|Column1|multiplied|
+-------+----------+
|      1|      10.0|
|      4|      40.0|
+-------+----------+



In [37]:
df.createOrReplaceTempView('table1')

In [38]:
df2 = session.sql('SELECT column1 as f1, column2 as f2 from table1')

In [39]:
df2.show()

+---+---+
| f1| f2|
+---+---+
|  1|  2|
|  4|  5|
+---+---+



In [40]:
df3 = df.withColumn(
    'derived_column', df['column1'] + df['column2'] * df['column3']
)


In [41]:
df3.show()

+-------+-------+-------+--------------+
|Column1|Column2|Column3|derived_column|
+-------+-------+-------+--------------+
|      1|      2|      3|             7|
|      4|      5|      6|            34|
+-------+-------+-------+--------------+



In [42]:
ADULT_COLUMN_NAMES = [
     "age",
     "workclass",
     "fnlwgt",
     "education",
     "education_num",
     "marital_status",
     "occupation",
     "relationship",
     "race",
     "sex",
     "capital_gain",
     "capital_loss",
     "hours_per_week",
     "native_country",
     "income"
 ]

In [44]:
csv_df = session.read.csv(
    'adult.data', header=False, inferSchema=True
)

for new_col, old_col in zip(ADULT_COLUMN_NAMES, csv_df.columns):
     csv_df = csv_df.withColumnRenamed(old_col, new_col)

In [46]:
csv_df.show()

+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|         0.0|         0.0|   

In [48]:
csv_df.describe().show()

[Stage 52:>                                                         (0 + 1) / 1]

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|marital_status|       occupation|relationship|               race|    sex|      capital_gain|    capital_loss|    hours_per_week|native_country|income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.58164675532

                                                                                

In [49]:
work_hours_df = csv_df.groupBy(
    'age'
).agg(
    funcs.avg('hours_per_week'),
    funcs.stddev_samp('hours_per_week')
).sort('age')

In [50]:
work_hours_df.show()



+---+-------------------+---------------------------+
|age|avg(hours_per_week)|stddev_samp(hours_per_week)|
+---+-------------------+---------------------------+
| 17| 21.367088607594937|         10.021014993616216|
| 18| 25.912727272727274|         11.733362123434848|
| 19| 30.678370786516854|         12.119154493614719|
| 20|  32.28021248339974|         11.726599330994663|
| 21|  34.03472222222222|         12.040389374051912|
| 22|  35.17124183006536|         11.968466821743275|
| 23|  36.71835803876853|         10.916632739093428|
| 24|  39.08897243107769|         10.638975889466733|
| 25|  40.00713436385256|         10.452953398659348|
| 26|  41.06496815286624|          11.29552504314252|
| 27| 42.039520958083834|         10.755941741375546|
| 28|  42.02768166089965|         10.737113530868324|
| 29|  42.36531365313653|         10.206157095904361|
| 30| 42.167247386759584|         10.990266114829758|
| 31| 42.877252252252255|         11.008740019442087|
| 32| 42.878019323671495|   

                                                                                

In [52]:
import pyspark
from pyspark import SparkContext

In [57]:
nums = session.sparkContext.parallelize([1,2,3,4])

In [58]:
nums.take(1)

[1]

In [59]:
squared = nums.map(lambda x: x*x).collect()

In [60]:
for num in squared:
    print('%i ' % (num))

1 
4 
9 
16 
