In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
df = spark.read.json('/FileStore/tables/people.json')

In [4]:
df.show()

In [5]:
df.printSchema()

In [6]:
df.columns

In [7]:
df.describe().show()

In [8]:
from pyspark.sql.types import (StructField, StringType, 
                               IntegerType, StructType)

In [9]:
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [10]:
final_struc = StructType(fields = data_schema)

In [11]:
df = spark.read.json('/FileStore/tables/people.json', schema = final_struc)

In [12]:
df.printSchema()

In [13]:
df.show()

In [14]:
type(df['age'])

In [15]:
df.select('age').show()

In [16]:
type(df.select('age'))

In [17]:
df.head(2)

In [18]:
df.head(2)[0]

In [19]:
type(df.head(2)[0])

In [20]:
df.select(['age', 'name']).show()

In [21]:
df.withColumn('newage', df['age']).show()

In [22]:
df.withColumn('double_age', df['age']*2).show()

In [23]:
df.show()

In [24]:
df.withColumnRenamed('age', 'my_new_age').show()

In [25]:
df.show()

In [26]:
df.withColumn('double_age', df['age']+2).show()

In [27]:
df.createOrReplaceTempView('people')

In [28]:
result = spark.sql("SELECT * FROM people")

In [29]:
result.show()

In [30]:
new_result = spark.sql("SELECT * FROM people WHERE age = 30").show()

In [31]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [32]:
df = spark.read.csv('/FileStore/tables/appl_stock.csv', inferSchema=True, header=True)

In [33]:
df.printSchema()

In [34]:
df.show()

In [35]:
df.head(3)

In [36]:
df.head(3)[0]

In [37]:
df.filter("Close < 500").show()

In [38]:
df.filter("Close < 500").select('open').show()

In [39]:
df.filter("Close < 500").select('open', 'Close').show()

In [40]:
df.filter(df['Close'] < 500).show()

In [41]:
df.filter(df['Close'] < 500).select('Volume').show()

In [42]:
df.filter( (df['Close'] < 200) & (df['Open'] > 200) ).show()

In [43]:
df.filter( (df['Close'] < 200) & ~(df['Open'] > 200) ).show()

In [44]:
df.filter(df['Low'] == 197.16).show()

In [45]:
result = df.filter(df['Low'] == 197.16).collect()

In [46]:
result

In [47]:
result[0]

In [48]:
row = result[0]

In [49]:
row.asDict()

In [50]:
row.asDict()['Volume']

In [51]:
#GroupBy & Aggregate
spark = SparkSession.builder.appName("aggs").getOrCreate()

In [52]:
df = spark.read.csv('/FileStore/tables/sales_info.csv', inferSchema=True, header=True)

In [53]:
df.show()

In [54]:
df.printSchema()

In [55]:
df.groupBy('Company')

In [56]:
df.groupBy('company').mean()

In [57]:
df.groupBy('company').mean().show()

In [58]:
df.groupBy('company').sum().show()

In [59]:
df.groupBy('company').max().show()

In [60]:
df.groupBy('company').min().show()

In [61]:
df.groupBy('company').count().show()

In [62]:
df.agg( {'Sales': 'sum'} ).show()

In [63]:
df.agg( {'Sales': 'max'} ).show()

In [64]:
grouped_data = df.groupBy("Company") 

In [65]:
grouped_data.agg({'Sales': 'max'}).show()

In [66]:
from pyspark.sql.functions import countDistinct, avg,stddev 

In [67]:
df.select(countDistinct('Sales')).show()

In [68]:
df.select(avg('Sales')).show()

In [69]:
df.select(avg('Sales').alias('Average Sales')).show()

In [70]:
df.select(stddev('Sales')).show()

In [71]:
from pyspark.sql.functions import format_number

In [72]:
sales_std = df.select(stddev('Sales').alias('std dev'))

In [73]:
sales_std.show()

In [74]:
sales_std.select(format_number('std dev', 2).alias('std dev')).show()

In [75]:
df.show()

In [76]:
df.orderBy('Sales').show()

In [77]:
df.orderBy(df['Sales'].desc()).show()

In [78]:
#Missing Data

df = spark.read.csv('/FileStore/tables/ContainsNull.csv', header=True, inferSchema=True)

In [79]:
df.show()

In [80]:
#Dropping missing data
df.na.drop().show()

In [81]:
df.na.drop(thresh=2).show()

In [82]:
df.na.drop(how='any').show()

In [83]:
df.na.drop(how='all').show()

In [84]:
df.na.drop(subset=['Sales']).show()

In [85]:
df.printSchema()

In [86]:
df.na.fill('FILL VALUE').show() #Only name will be filled

In [87]:
df.na.fill(0).show() #Now sales value will be filled

In [88]:
df.na.fill('No Name', subset=['Name']).show()

In [89]:
from pyspark.sql.functions import mean

In [90]:
mean_value = df.select(mean(df['Sales'])).collect()

In [91]:
mean_value

In [92]:
mean_value[0][0]

In [93]:
mean_sales = mean_value[0][0]

In [94]:
df.na.fill(mean_sales, ['Sales']).show()

In [95]:
df.na.fill(df.select(mean(df['sales'])).collect()[0][0], ['Sales']).show()

In [96]:
#Dates & Timeframes
spark = SparkSession.builder.appName('dates').getOrCreate()

In [97]:
df = spark.read.csv("/FileStore/tables/appl_stock.csv", header=True, inferSchema=True)

In [98]:
df.head(1)

In [99]:
df.show()

In [100]:
df.select(['Date', 'Open']).show()

In [101]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, 
                                  month, year, weekofyear,
                                  format_number, date_format)

In [102]:
df.select(dayofmonth(df['date'])).show()

In [103]:
df.select(hour(df['date'])).show()

In [104]:
df.select(month(df['date'])).show()

In [105]:
df.select(year(df['Date'])).show()

In [106]:
df.withColumn("Year", year(df['Date'])).show()

In [107]:
df_new = df.withColumn("Year", year(df['Date']))

In [108]:
df_new.groupBy("Year").mean().select(["Year", "avg(Close)"]).show()

In [109]:
result = df_new.groupBy("Year").mean().select(["Year", "avg(Close)"])

In [110]:
new = result.withColumnRenamed("avg(Close)", "Average Closign Price")

In [111]:
new.select(['Year', format_number("Average Closign Price", 2).alias("Avg Close")]).show()

In [112]:
from pyspark.sql import SparkSession

In [113]:
spark = SparkSession.builder.appName("solution").getOrCreate()

In [114]:
df = spark.read.csv("/FileStore/tables/walmart_stock.csv", header=True, inferSchema=True)

In [115]:
df.columns

In [116]:
df.printSchema()

In [117]:
df.head(5)

In [118]:
for row in df.head(5):
  print(row)
  print('\n')

In [119]:
df.describe().show()

In [120]:
df.describe().printSchema()

In [121]:
from pyspark.sql.functions import format_number

In [122]:
result = df.describe()

In [123]:
result.select(result['summary'],
             format_number(result['Open'].cast('float'), 2).alias('Open'),
             format_number(result['High'].cast('float'), 2).alias('High'),
             format_number(result['Low'].cast('float'), 2).alias('Low'),
             format_number(result['Close'].cast('float'), 2).alias('Close'),
             format_number(result['Volume'].cast('int'), 2).alias('Volume')
             ).show()

In [124]:
df2 = df.withColumn('HV Ratio', df['High']/df['Volume'])
df2.select('HV Ratio').show()

In [125]:
df2.orderBy(df['High'].desc()).head(1)[0][0]

In [126]:
from pyspark.sql.functions import mean
df.select(format_number(mean("Close"), 2)).show()

In [127]:
from pyspark.sql.functions import max, min
df.select(max('Volume'), min('Volume')).show()

In [128]:
df.filter('Close < 60').count()

In [129]:
df.filter(df['Close'] < 60).count()

In [130]:
from pyspark.sql.functions import count

In [131]:
result = df.filter(df['Close'] < 60)

In [132]:
result.select(count('Close')).show()

In [133]:
( df.filter(df['High'] > 80).count()*1.0 / df.count() )*100

In [134]:
from pyspark.sql.functions import corr
df.select(corr('High', 'Volume')).show()

In [135]:
from pyspark.sql.functions import year
yeardf = df.withColumn('Year', year(df['Date']))

In [136]:
max_df = yeardf.groupby('Year').max()
max_df.select('Year', 'max(High)').show()

In [137]:
from pyspark.sql.functions import month

In [138]:
monthdf = df.withColumn('Month', month('Date'))

In [139]:
monthdfavgs = monthdf.select(['Month', 'Close']).groupby('Month').mean()

In [140]:
monthdfavgs.select('Month', 'avg(Close)').orderBy('Month').show()