In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [2]:
sc = SparkContext('local')
spark = SparkSession(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/08 20:31:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/08 20:31:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import concat, lit, unix_timestamp

In [4]:
consumption_schema = StructType(fields=[
    StructField('Date', StringType()),
    StructField('Time', StringType()),
    StructField('Global_active_power', DoubleType()),
    StructField('Global_reactive_power', DoubleType()),
    StructField('Voltage', DoubleType()),
    StructField('Global_intensity', DoubleType()),
    StructField('Sub_metering_1', DoubleType()),
    StructField('Sub_metering_2', DoubleType()),
    StructField('Sub_metering_3', DoubleType()),
])

In [5]:
rdd = sc.textFile('power-consumption/household-power-consumption-p1.txt')
df = spark.read.option('delimiter', ';').option('header', True).schema(consumption_schema).csv(rdd)

Traceback (most recent call last):                                  (0 + 1) / 1]
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

In [6]:
display(df)

DataFrame[Date: string, Time: string, Global_active_power: double, Global_reactive_power: double, Voltage: double, Global_intensity: double, Sub_metering_1: double, Sub_metering_2: double, Sub_metering_3: double]

In [7]:
renamed_df = df.withColumnRenamed('Global_active_power', 'global_active_pow') \
    .withColumnRenamed('Global_reactive_power', 'global_reactive_pow') \
    .withColumnRenamed('Global_intensity', 'global_intesity') \
    .withColumnRenamed('Voltage', 'voltage') \
    .withColumnRenamed('Sub_metering_1', 'sub_metering_1') \
    .withColumnRenamed('Sub_metering_2', 'sub_metering_2') \
    .withColumnRenamed('Sub_metering_3', 'sub_metering_3')

In [8]:
selected_df = renamed_df.select(
    'global_active_pow', 'global_reactive_pow',
    'global_intesity', 'voltage', 'sub_metering_1',
    'sub_metering_2', 'sub_metering_3',
    unix_timestamp(concat(df.Date, lit(' '), df.Time), 'd/M/yyyy HH:mm:ss').alias('record_timestamp')
)

In [9]:
selected_df.show(4)

+-----------------+-------------------+---------------+-------+--------------+--------------+--------------+----------------+
|global_active_pow|global_reactive_pow|global_intesity|voltage|sub_metering_1|sub_metering_2|sub_metering_3|record_timestamp|
+-----------------+-------------------+---------------+-------+--------------+--------------+--------------+----------------+
|            4.216|              0.418|           18.4| 234.84|           0.0|           1.0|          17.0|      1166289840|
|             5.36|              0.436|           23.0| 233.63|           0.0|           1.0|          16.0|      1166289900|
|            5.374|              0.498|           23.0| 233.29|           0.0|           2.0|          17.0|      1166289960|
|            5.388|              0.502|           23.0| 233.74|           0.0|           1.0|          17.0|      1166290020|
+-----------------+-------------------+---------------+-------+--------------+--------------+--------------+----------

In [10]:
display(selected_df)

DataFrame[global_active_pow: double, global_reactive_pow: double, global_intesity: double, voltage: double, sub_metering_1: double, sub_metering_2: double, sub_metering_3: double, record_timestamp: bigint]

In [12]:
selected_df.agg({"sub_metering_1": "max", "sub_metering_2": "max", "sub_metering_3": "max"}).collect()

DataFrame[max(sub_metering_2): double, max(sub_metering_1): double, max(sub_metering_3): double]

In [14]:
selected_df.agg({"record_timestamp": "max"}).collect()

                                                                                

[Row(max(record_timestamp)=1197418620)]