<a href="https://colab.research.google.com/github/D-Vaibhav/google_colab/blob/pyspark/01_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
print(pyspark.__version)

NameError: ignored

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark

https://medium.com/walmartglobaltech/decoding-memory-in-spark-parameters-that-are-often-confused-c11be7488a24

https://sparkbyexamples.com/spark/spark-show-display-dataframe-contents-in-table/

https://sparkbyexamples.com/pyspark/pyspark-count/

https://sparkbyexamples.com/pyspark/pyspark-where-filter/

In [None]:
%%sh
ls -l ./sample_data/*.csv

-rw-r--r-- 1 root root   301141 Mar 20 13:36 ./sample_data/california_housing_test.csv
-rw-r--r-- 1 root root  1706430 Mar 20 13:36 ./sample_data/california_housing_train.csv
-rw-r--r-- 1 root root 18289443 Mar 20 13:36 ./sample_data/mnist_test.csv
-rw-r--r-- 1 root root 36523880 Mar 20 13:36 ./sample_data/mnist_train_small.csv


In [None]:
# 1. IMPORTING PACKAGES ======================================================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, count, desc, sum


# 2. SETTING MAIN FUNCTION FOR OS CALLING ====================================================================
def init_spark():
    spark = SparkSession.builder\
                        .master('local[*]')\
                        .appName("quick_revision")\
                        .getOrCreate()
                        
    sc = spark.sparkContext
    return spark, sc


def main():
    spark, sc = init_spark()
    print(sc)

    california_housing_test = read_csv_data(spark, sc)

    print('calling function => basic_operation_via_df_api --------------------------')
    basic_operation_via_df_api(california_housing_test)

    print('calling function => basic_operation_via_spark_sql --------------------------')
    basic_operation_via_spark_sql(spark, california_housing_test)



# 3. AD-HOC FUNCTIONs =======================================================================================
def read_csv_data(spark, sc):
    data_path = './sample_data/california_housing_test.csv'
    california_housing_test = spark.read.csv(data_path, header=True, escape=',')
    california_housing_test.printSchema()
    california_housing_test.show(7)

    return california_housing_test


def basic_operation_via_df_api(df):
    total_rows = df.count()
    print(f'total_rows are {total_rows}\n\n')

    # selecting columns -------------------------------------------------------------------------------
    df1 = df.select('longitude', 'latitude')
    print(df1)
    df1.show(7)
    print('\n\n')

    df2 = df.select(df.longitude, df.latitude).distinct()
    print(df2)
    df2.show(7)
    print('\n\n')

    df3 = df.select(col('longitude'), col('latitude'))
    print(df3)
    df3.show(7)
    print('\n\n')

    # round, aliasing columns -------------------------------------------------------------------------------
    df1 = df1.select('*', round('longitude').alias('long'), round('latitude').alias('lati'))
    print(df1)
    df1.show(7)
    print(f'df1.count() is {df1.count()} \n\n')

    # groupBy, agg, orderBy function columns -------------------------------------------------------------------------------
    df10 = df1.groupBy('long').agg(count('lati')).orderBy(desc('long'))
    print(df10)
    df10.show(7)
    print(f'df10.count() is {df10.count()} \n\n')

    # df_temp = df10.select('count(lati)'.alias('lati'))
    df_temp = df10.select(col('count(lati)').alias('lati'))
    df_temp = df_temp.agg(sum('lati'))
    print(df_temp)
    df_temp.show(7)
    print(f'df_temp.count() is {df_temp.count()} \n\n')

    # where/filtering columns -------------------------------------------------------------------------------
    df1.filter(df1.lati == '37.0').show(7)

    df1.where((df1.lati == '37.0') & (df1.longitude == '-122.050000')).show(7)


    
    




def basic_operation_via_spark_sql(spark, df):
    # creating view in-order to apply spark sql -------------------------------------------------------
    df.createOrReplaceTempView('df')

    df1 = spark.sql('select * from df')
    print(df1)
    df1.show(7)
    print(f'df1.count() is {df1.count()} \n\n')


if __name__ == '__main__':
    main()


<SparkContext master=local[*] appName=quick_revision>
root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.00