# Fundamentals

In [0]:
import pyspark
import pandas as pd



In [0]:
df = sqlContext.sql("SELECT * FROM people_json")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1820800551102545>:1[0m
[0;32m----> 1[0m df [38;5;241m=[39m [43msqlContext[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mSELECT * FROM people_json[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_

In [0]:
df.show()



In [0]:
df.printSchema()



In [0]:
df.columns



In [0]:
df.describe().show()



In [0]:
from pyspark.sql.types import (StructField , StringType, 
                               IntegerType, StructType)



In [0]:
data_schema = [StructField('age', IntegerType() , nullable = True),
              StructField('name', StringType(), nullable = True)]

final_struct = StructType(fields=data_schema)



In [0]:
# make sure the data is seen as a dataframe type
df = spark.createDataFrame(
    # turn df into RDD (distributed collection of the data elements)
    # then back into a dataframe object in order to change the schema
    df.rdd, schema=final_struct)

# create a temp view of spark dataframe table
df.createOrReplaceTempView("people_json_file")



In [0]:
df.printSchema()



In [0]:
type(df.age)



In [0]:
df.select('age')



In [0]:
df.select('age').show()



In [0]:
print()
df.head(2)



In [0]:
df.select(['name', 'age']).show()



In [0]:
df.withColumn('double_age', df['age']*2).show()



In [0]:
df.withColumnRenamed('age', 'my_new_age').show()



In [0]:
# the equivalent of a .apply in pandas with pyspark and a custom function


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# define the custom function
def my_func(age):
    if age is not None:
        # do some processing
        return age * 3

# create a UDF from the custom function
my_udf = udf(my_func, IntegerType())

# apply the UDF to the age and name columns
result_df = df.withColumn('result', my_udf('age'))

# display the result DataFrame
result_df.show()



In [0]:
df.createOrReplaceTempView('people')



In [0]:
results = spark.sql('SELECT * from people')
results.show()



# Basic Operations

In [0]:
stock_df = sqlContext.sql("SELECT * from appl_stock_csv")
stock_df = spark.createDataFrame(stock_df.rdd)
stock_df.show(5)



In [0]:
stock_df.printSchema()



In [0]:
stock_df.filter("Close < 200").show()



In [0]:
stock_df.filter("Close < 200").select(['Date','Open', 'Close']).show()



In [0]:
stock_df.filter(stock_df['Close'] < 500).select('High').show()



In [0]:
# select the days where the close price was greater than the opening price
from pyspark.sql.functions import date_format

result = (stock_df
 .filter("Close > Open")
 .withColumn("date_only", date_format("Date", "yyyy-MM-dd"))
 .select('date_only').collect())



In [0]:
row = result[0]
row.asDict()



In [0]:
# what about whole dataframe wihtout days?
result = (stock_df
 .filter("Close > Open").collect())



In [0]:
row = result[0]



In [0]:
row.asDict()



# Groupby and Aggregate Functions

In [0]:
data = sqlContext.sql("SELECT * FROM sales_info_csv")
data = spark.createDataFrame(data.rdd)
data.show()



In [0]:
data.printSchema()



In [0]:
data.groupby("Company")



In [0]:
data.groupby(["Company", "Person"]).mean().show()



In [0]:
data.groupby("Company").agg({'Sales': 'sum'}).show()



In [0]:
from pyspark.sql.functions import mean, median, skewness, kurtosis, min, max, approx_count_distinct, stddev


# Group the DataFrame by the 'Company' column
grouped_df = data.groupBy("Company")

# Use the agg() method to apply the required aggregation functions to the other columns in the DataFrame
agg_df = grouped_df.agg(
    mean("Sales").alias("mean"),
    median("Sales").alias("median"),
    stddev("Sales").alias("Stdev"),
    skewness("Sales").alias("skew"),
    kurtosis("Sales").alias("kurtosis"),
    min("Sales").alias("min"),
    max("Sales").alias("max"),
    approx_count_distinct("Sales").alias('Count')
)

# Print the resulting DataFrame
agg_df.show()



In [0]:
# we can get rid of huge decimals this way
from pyspark.sql.functions import format_number


# Group the DataFrame by the 'Company' column
grouped_df = data.groupBy("Company")

# Use the agg() method to apply the required aggregation functions to the other columns in the DataFrame
agg_df = grouped_df.agg(
    format_number(mean("Sales"), 3).alias("mean"),
    format_number(median("Sales"), 3).alias("median"),
    format_number(stddev("Sales"), 3).alias("Stdev"),
    format_number(skewness("Sales"), 3).alias("skew"),
    format_number(kurtosis("Sales"), 3).alias("kurtosis"),
    min("Sales").alias("min"),
    max("Sales").alias("max"),
    approx_count_distinct("Sales").alias('Count')
)

# Print the resulting DataFrame
agg_df.show()



In [0]:
agg_df.orderBy('Count').show()



In [0]:
agg_df.orderBy(agg_df['Count'].desc()).show()



# Missing Values

In [0]:
table_name = 'containsnull'

df = sqlContext.sql(f"SELECT * FROM {table_name}")
df = spark.createDataFrame(df.rdd)
df.show()
df.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1820800551102607>:3[0m
[1;32m      1[0m table_name [38;5;241m=[39m [38;5;124m'[39m[38;5;124mcontainsnull[39m[38;5;124m'[39m
[0;32m----> 3[0m df [38;5;241m=[39m sqlContext[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mSELECT * FROM [39m[38;5;132;01m{[39;00mtable_name[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m      4[0m df [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFrame(df[38;5;241m.[39mrdd)
[1;32m      5[0m df[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=

In [0]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# thresh tells me how many non_null values we need before we drop the row. Therefore only emp2 is dropped
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# how
df.na.drop(how= 'any').show()
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# subset
df.na.drop(subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# fill method
df.printSchema()

df.na.fill('FILL VALUE').show()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [0]:
df.na.fill(0, subset = ['Sales']).show()
df.na.fill('NO NAME', subset = ['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|NO NAME| null|
|emp3|NO NAME|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [0]:
from pyspark.sql.functions import mean

mean_val = df.agg(
        mean("Sales").alias("Sale Mean")
).collect()

mean_val = mean_val[0].asDict()['Sale Mean']
mean_val

Out[28]: 400.5

In [0]:
df.na.fill(mean_val, subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Dates and Timestamps

In [0]:
table_name = "appl_stock_data"
stock_df = sqlContext.sql(f"SELECT * FROM {table_name}")
stock_df = spark.createDataFrame(stock_df.rdd)
stock_df.show(5)

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.42999267578125|             214.5| 212.3800048828125|214.00999450683594|123432400|27.727039337158203|
|2010-01-05 00:00:00|214.59999084472656|215.58999633789062|            213.25|214.37998962402344|150476200| 27.77497673034668|
|2010-01-06 00:00:00|214.37998962402344|215.22999572753906|            210.75|210.97000122070312|138040000| 27.33317756652832|
|2010-01-07 00:00:00|            211.75|             212.0| 209.0500030517578| 210.5800018310547|119282800|27.282649993896484|
|2010-01-08 00:00:00|210.29998779296875|             212.0|209.05999755859375|211.98001098632812|111902700|27.4

In [0]:
stock_df.head(1)

Out[4]: [Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.42999267578125, High=214.5, Low=212.3800048828125, Close=214.00999450683594, Volume=123432400, Adj Close=27.727039337158203)]

In [0]:
stock_df.head(1)[0].asDict()

Out[7]: {'Date': datetime.datetime(2010, 1, 4, 0, 0),
 'Open': 213.42999267578125,
 'High': 214.5,
 'Low': 212.3800048828125,
 'Close': 214.00999450683594,
 'Volume': 123432400,
 'Adj Close': 27.727039337158203}

In [0]:
stock_df.select('Date', 'Close').show()

+-------------------+------------------+
|               Date|             Close|
+-------------------+------------------+
|2010-01-04 00:00:00|214.00999450683594|
|2010-01-05 00:00:00|214.37998962402344|
|2010-01-06 00:00:00|210.97000122070312|
|2010-01-07 00:00:00| 210.5800018310547|
|2010-01-08 00:00:00|211.98001098632812|
|2010-01-11 00:00:00|210.11000061035156|
|2010-01-12 00:00:00|207.72000122070312|
|2010-01-13 00:00:00|210.65000915527344|
|2010-01-14 00:00:00|209.42999267578125|
|2010-01-15 00:00:00|205.92999267578125|
|2010-01-19 00:00:00| 215.0399932861328|
|2010-01-20 00:00:00|211.72999572753906|
|2010-01-21 00:00:00| 208.0699920654297|
|2010-01-22 00:00:00|            197.75|
|2010-01-25 00:00:00|203.07000732421875|
|2010-01-26 00:00:00|205.94000244140625|
|2010-01-27 00:00:00| 207.8800048828125|
|2010-01-28 00:00:00| 199.2899932861328|
|2010-01-29 00:00:00|192.05999755859375|
|2010-02-01 00:00:00|194.72999572753906|
+-------------------+------------------+
only showing top

In [0]:
from pyspark.sql.functions import format_number
# assuming your numeric columns are named "my_numeric_column1" and "my_numeric_column2"
result_df = stock_df.select("Date",
                      format_number("Close", 3).alias("Close rounded"))

# display the result DataFrame
result_df.show()

+-------------------+------------------+-------------+
|               Date|             Close|Close rounded|
+-------------------+------------------+-------------+
|2010-01-04 00:00:00|214.00999450683594|      214.010|
|2010-01-05 00:00:00|214.37998962402344|      214.380|
|2010-01-06 00:00:00|210.97000122070312|      210.970|
|2010-01-07 00:00:00| 210.5800018310547|      210.580|
|2010-01-08 00:00:00|211.98001098632812|      211.980|
|2010-01-11 00:00:00|210.11000061035156|      210.110|
|2010-01-12 00:00:00|207.72000122070312|      207.720|
|2010-01-13 00:00:00|210.65000915527344|      210.650|
|2010-01-14 00:00:00|209.42999267578125|      209.430|
|2010-01-15 00:00:00|205.92999267578125|      205.930|
|2010-01-19 00:00:00| 215.0399932861328|      215.040|
|2010-01-20 00:00:00|211.72999572753906|      211.730|
|2010-01-21 00:00:00| 208.0699920654297|      208.070|
|2010-01-22 00:00:00|            197.75|      197.750|
|2010-01-25 00:00:00|203.07000732421875|      203.070|
|2010-01-2

In [0]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, date_format, stddev, median

In [0]:
stock_by_month = stock_df.select(
    
    dayofmonth(stock_df['Date']).alias('Day of Month'),
    month(stock_df['Date']).alias('Month'),
    hour(stock_df['Date']).alias('Hour'),
    format_number(stock_df['Close'], 3).alias('Close Price')

).groupBy('Month').agg(

    format_number(mean("Close Price"),2).alias('Mean Close'),
    format_number(stddev('Close Price'),2).alias('Std Close'),
    median('Close Price').alias('Median Close')
)


stock_by_month.orderBy("Month").show()

+-----+----------+---------+------------+
|Month|Mean Close|Std Close|Median Close|
+-----+----------+---------+------------+
|    1|    322.21|   169.25|     341.025|
|    2|    321.36|   169.28|      351.88|
|    3|    332.91|   177.63|      348.51|
|    4|    340.51|   179.94|     338.485|
|    5|    351.62|   187.23|      346.57|
|    6|    288.13|   179.07|     267.775|
|    7|    281.72|   180.38|     255.745|
|    8|    300.44|   199.02|      251.79|
|    9|    301.08|   203.94|      269.14|
|   10|    308.31|   197.88|     301.645|
|   11|    306.27|   182.35|      313.36|
|   12|    302.35|   183.29|      320.61|
+-----+----------+---------+------------+

