## Installation and imports

In [1]:
 !pip install pyspark==3.5.0

Collecting pyspark==3.5.0
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=92fb7a61c1e74021b235065131104ae751e388fa6f9629c6d58a601b011b79a9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, avg, min, max, mean, count, to_timestamp, lit, lower, upper, substring, expr, to_date, date_add, date_sub, lpad
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql import Row

In [4]:
# help to see how to use a functions
help(substring)

Help on function substring in module pyspark.sql.functions:

substring(str: 'ColumnOrName', pos: int, len: int) -> pyspark.sql.column.Column
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Parameters
    ----------
    str : :class:`~pyspark.sql.Column` or str
        target column to work on.
    pos : int
        starting position in str.
    len : int
        length of chars.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        substring of given value.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]



## Session starts

In [5]:
spark = SparkSession.builder.appName("TitanicData").getOrCreate()
file_path = '/kaggle/input/titanic/train.csv'

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/22 10:41:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Schema

In [6]:
# Define schema
schema = StructType([
    StructField("PassengerId", IntegerType(), True),
    StructField("Survived", IntegerType(), True),
    StructField("Pclass", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("SibSp", IntegerType(), True),
    StructField("Parch", IntegerType(), True),
    StructField("Ticket", StringType(), True),
    StructField("Fare", FloatType(), True),
    StructField("Cabin", StringType(), True),
    StructField("Embarked", StringType(), True)
])

# Define schema with list comprehension
labels = [
   ("PassengerId", IntegerType()),
   ("Survived", IntegerType()),
   ("Pclass", IntegerType()),
   ("Name", StringType()),
   ("Sex", StringType()),
   ("Age", FloatType()),
   ("SibSp", IntegerType()),
   ("Parch", IntegerType()),
   ("Ticket", StringType()),
   ("Fare", FloatType()),
   ("Cabin", StringType()),
   ("Embarked", StringType())
]

schema=StructType([StructField(x[0],x[1],True) for x in labels])


df = ( spark.read
    .option("header", "true")
    .schema(schema)
    .csv(file_path)
   )

In [7]:
#print schema
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## Working with rows and columns

In [8]:
# Add column with date of birth to work with dates later
# expr function in PySpark is used to execute SQL expressions and return a new column
df = df.withColumn(
    "DateOfBirth",
    expr("date_add(to_date('1912-04-15', 'yyyy-MM-dd'), -cast(Age * 365 as int)) + cast(rand() * 365 as int)")
)

# Show a few rows to verify
df.select("PassengerId", "Age", "DateOfBirth").show()

                                                                                

+-----------+----+-----------+
|PassengerId| Age|DateOfBirth|
+-----------+----+-----------+
|          1|22.0| 1890-11-07|
|          2|38.0| 1875-01-16|
|          3|26.0| 1886-12-30|
|          4|35.0| 1878-01-06|
|          5|35.0| 1877-10-30|
|          6|NULL|       NULL|
|          7|54.0| 1859-01-23|
|          8| 2.0| 1911-03-07|
|          9|27.0| 1885-12-29|
|         10|14.0| 1898-07-11|
|         11| 4.0| 1908-06-10|
|         12|58.0| 1855-02-12|
|         13|20.0| 1892-08-03|
|         14|39.0| 1873-12-14|
|         15|14.0| 1899-03-10|
|         16|55.0| 1858-01-24|
|         17| 2.0| 1910-07-05|
|         18|NULL|       NULL|
|         19|31.0| 1881-11-06|
|         20|NULL|       NULL|
+-----------+----+-----------+
only showing top 20 rows



In [9]:
# find the earliest and latest dates
df.select(min('DateOfBirth'),max('DateOfBirth')).show()

+----------------+----------------+
|min(DateOfBirth)|max(DateOfBirth)|
+----------------+----------------+
|      1832-09-14|      1912-08-24|
+----------------+----------------+



In [10]:
# +3 and -3
df.select(date_sub(min('DateOfBirth'),3),date_add(max('DateOfBirth'),3)).show()

+-----------------------------+-----------------------------+
|date_sub(min(DateOfBirth), 3)|date_add(max(DateOfBirth), 3)|
+-----------------------------+-----------------------------+
|                   1832-09-11|                   1912-08-27|
+-----------------------------+-----------------------------+



In [11]:
# showing n rows
df.select("Name").show(n=10, truncate=False)

+---------------------------------------------------+
|Name                                               |
+---------------------------------------------------+
|Braund, Mr. Owen Harris                            |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)|
|Heikkinen, Miss. Laina                             |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)       |
|Allen, Mr. William Henry                           |
|Moran, Mr. James                                   |
|McCarthy, Mr. Timothy J                            |
|Palsson, Master. Gosta Leonard                     |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)  |
|Nasser, Mrs. Nicholas (Adele Achem)                |
+---------------------------------------------------+
only showing top 10 rows



In [12]:
# upper, lower and substring functions
df.select(upper('Name'), lower('Name'),substring('Name',0,3)).show(10)

+--------------------+--------------------+---------------------+
|         upper(Name)|         lower(Name)|substring(Name, 0, 3)|
+--------------------+--------------------+---------------------+
|BRAUND, MR. OWEN ...|braund, mr. owen ...|                  Bra|
|CUMINGS, MRS. JOH...|cumings, mrs. joh...|                  Cum|
|HEIKKINEN, MISS. ...|heikkinen, miss. ...|                  Hei|
|FUTRELLE, MRS. JA...|futrelle, mrs. ja...|                  Fut|
|ALLEN, MR. WILLIA...|allen, mr. willia...|                  All|
|    MORAN, MR. JAMES|    moran, mr. james|                  Mor|
|MCCARTHY, MR. TIM...|mccarthy, mr. tim...|                  McC|
|PALSSON, MASTER. ...|palsson, master. ...|                  Pal|
|JOHNSON, MRS. OSC...|johnson, mrs. osc...|                  Joh|
|NASSER, MRS. NICH...|nasser, mrs. nich...|                  Nas|
+--------------------+--------------------+---------------------+
only showing top 10 rows



In [13]:
# check unique values
df.select('Survived').distinct().show()

+--------+
|Survived|
+--------+
|       1|
|       0|
+--------+



In [14]:
# find percentage of survived
survived = df.filter(col('Survived') == 1).count() / df.count()
survived

0.3838383838383838

In [15]:
#find the top 3 ages
grouped_df = df.groupBy('Age').agg(count('PassengerId').alias('Passengers_count')).orderBy(col('Passengers_count').desc())
grouped_df.select('Age').show(3)

+----+
| Age|
+----+
|NULL|
|24.0|
|22.0|
+----+
only showing top 3 rows



In [16]:
#collecting rows
df.take(3)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S', DateOfBirth=datetime.date(1890, 11, 7)),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833023071289, Cabin='C85', Embarked='C', DateOfBirth=datetime.date(1875, 1, 16)),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925000190734863, Cabin=None, Embarked='S', DateOfBirth=datetime.date(1886, 12, 30))]

In [17]:
# Create a new column 'not_married' with 1 if 'Name' contains 'miss' or 'Miss', and 0 otherwise
df = df.withColumn(
    'not_married',
    when(col('Name').contains('miss') | col('Name').contains('Miss'), 1).otherwise(0)
)
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S| 1890-11-07|          0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C| 1875-01-16|          0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S| 1886-12-30|          1|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S| 1878-01-06|          0|
|          5|       0|     

In [18]:
# Create a new column with a constant value
df = df.withColumn(
    'one',
    lit(1))
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S| 1890-11-07|          0|  1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C| 1875-01-16|          0|  1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S| 1886-12-30|          1|  1|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S| 1878-01-06|          0|  1|

In [19]:
#padding
df=df.withColumn('paddedOne',lpad(col('One'),3,"0"))
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+---------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+---------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S| 1890-11-07|          0|  1|      001|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C| 1875-01-16|          0|  1|      001|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S| 1886-12-30|          1|  1|      001|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|        

In [20]:
# Select a specific row and column value
value = df.filter(col('PassengerId') == 3).select('Name').collect()[0]['Name']
value

'Heikkinen, Miss. Laina'

In [21]:
# Select columns by position
df_selected = df.select(df.columns[0:4])
df_selected.show(5)

+-----------+--------+------+--------------------+
|PassengerId|Survived|Pclass|                Name|
+-----------+--------+------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|
|          2|       1|     1|Cumings, Mrs. Joh...|
|          3|       1|     3|Heikkinen, Miss. ...|
|          4|       1|     1|Futrelle, Mrs. Ja...|
|          5|       0|     3|Allen, Mr. Willia...|
+-----------+--------+------+--------------------+
only showing top 5 rows



In [22]:
#getting shape
def get_shape(df):
    num_rows = df.count()
    num_cols = len(df.columns)
    return (num_rows, num_cols)

# Get the shape of the DataFrame
shape = get_shape(df_selected)
shape

(891, 4)

In [23]:
# Concatenate DataFrames: add a new row
new_row = spark.createDataFrame([Row(PassengerId=1000, Survived=1, Pclass=3, Name='JJ')])
df_new = new_row.union(df_selected)
df_new.orderBy("PassengerId", ascending=False).show(5)


[Stage 30:>                                                         (0 + 4) / 5]

+-----------+--------+------+--------------------+
|PassengerId|Survived|Pclass|                Name|
+-----------+--------+------+--------------------+
|       1000|       1|     3|                  JJ|
|        891|       0|     3| Dooley, Mr. Patrick|
|        890|       1|     1|Behr, Mr. Karl Ho...|
|        889|       0|     3|"Johnston, Miss. ...|
|        888|       1|     1|Graham, Miss. Mar...|
+-----------+--------+------+--------------------+
only showing top 5 rows



                                                                                

In [24]:
# Sort values
df = df.orderBy(col('Pclass').asc(), col('Survived').desc())
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+-----------+-----------+---+---------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+-----------+-----------+---+---------+
|         53|       1|     1|Harper, Mrs. Henr...|female|49.0|    1|    0|PC 17572|76.7292|  D33|       C| 1863-12-13|          0|  1|      001|
|        137|       1|     1|Newsom, Miss. Hel...|female|19.0|    0|    2|   11752|26.2833|  D47|       S| 1893-10-06|          1|  1|      001|
|         56|       1|     1|   Woolner, Mr. Hugh|  male|NULL|    0|    0|   19947|   35.5|  C52|       S|       NULL|          0|  1|      001|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S| 1878-01-06|          0|

In [25]:
# Drop duplicates
df = df.dropDuplicates(['PassengerId'])

In [26]:
# Drop columns
# Remove columns 'a' and 'b'
df_dropped = df.drop('Name', 'Ticket')
df_dropped.show(5)

24/05/22 10:41:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+--------+------+------+----+-----+-----+-------+-----+--------+-----------+-----------+---+---------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|
+-----------+--------+------+------+----+-----+-----+-------+-----+--------+-----------+-----------+---+---------+
|          1|       0|     3|  male|22.0|    1|    0|   7.25| NULL|       S| 1890-11-07|          0|  1|      001|
|          2|       1|     1|female|38.0|    1|    0|71.2833|  C85|       C| 1875-01-16|          0|  1|      001|
|          3|       1|     3|female|26.0|    0|    0|  7.925| NULL|       S| 1886-12-30|          1|  1|      001|
|          4|       1|     1|female|35.0|    1|    0|   53.1| C123|       S| 1878-01-06|          0|  1|      001|
|          5|       0|     3|  male|35.0|    0|    0|   8.05| NULL|       S| 1877-10-30|          0|  1|      001|
+-----------+--------+------+------+----+-----+-----+-------+-----+--------+----

In [27]:
# Rename columns
df = df.withColumnRenamed('Pclass', 'Class')
df.show(5)

+-----------+--------+-----+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+---------+
|PassengerId|Survived|Class|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|
+-----------+--------+-----+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+-----------+---+---------+
|          1|       0|    3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S| 1890-11-07|          0|  1|      001|
|          2|       1|    1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C| 1875-01-16|          0|  1|      001|
|          3|       1|    3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S| 1886-12-30|          1|  1|      001|
|          4|       1|    1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          11380

In [28]:
# Group and aggregate
grouped_df = df.groupBy('Sex', 'Class','Survived').agg(count('PassengerId').alias('Passengers_count'))
grouped_df.show(50)

+------+-----+--------+----------------+
|   Sex|Class|Survived|Passengers_count|
+------+-----+--------+----------------+
|female|    1|       0|               3|
|female|    2|       0|               6|
|  male|    2|       0|              91|
|  male|    1|       0|              77|
|  male|    3|       0|             300|
|female|    3|       0|              72|
|  male|    2|       1|              17|
|female|    2|       1|              70|
|  male|    1|       1|              45|
|female|    1|       1|              91|
|  male|    3|       1|              47|
|female|    3|       1|              72|
+------+-----+--------+----------------+



In [29]:
#check nulls
# Initialize an empty dictionary to store the counts
null_counts = {}

# Iterate over each column in the DataFrame
for col_name in df.columns:
    # Count the number of null values in the column
    null_count = df.filter(col(col_name).isNull()).count()
    # Store the count in the dictionary
    null_counts[col_name] = null_count

# Print the null counts for each column
for col_name, count in null_counts.items():
    print(f"Column '{col_name}': {count} null values")

Column 'PassengerId': 0 null values
Column 'Survived': 0 null values
Column 'Class': 0 null values
Column 'Name': 0 null values
Column 'Sex': 0 null values
Column 'Age': 177 null values
Column 'SibSp': 0 null values
Column 'Parch': 0 null values
Column 'Ticket': 0 null values
Column 'Fare': 0 null values
Column 'Cabin': 687 null values
Column 'Embarked': 2 null values
Column 'DateOfBirth': 177 null values
Column 'not_married': 0 null values
Column 'one': 0 null values
Column 'paddedOne': 0 null values


In [30]:
mean_value = df.select(mean(col('Age'))).collect()[0][0]
print(mean_value)
# Replace null values with the mean value
df = df.withColumn('Age', when(col('Age').isNull(), mean_value).otherwise(col('Age')))

29.69911764704046


In [31]:
# Calculate ranks with partitions
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
df = df.withColumn('Age', col('Age').cast('integer'))
windowSpec = Window.partitionBy("Sex").orderBy(col('Age').asc())
df = df.withColumn('rank', rank().over(windowSpec))
df.show(50)

+-----------+--------+-----+--------------------+------+---+-----+-----+-------------+--------+-------+--------+-----------+-----------+---+---------+----+
|PassengerId|Survived|Class|                Name|   Sex|Age|SibSp|Parch|       Ticket|    Fare|  Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|rank|
+-----------+--------+-----+--------------------+------+---+-----+-----+-------------+--------+-------+--------+-----------+-----------+---+---------+----+
|        470|       1|    3|Baclini, Miss. He...|female|  0|    2|    1|         2666| 19.2583|   NULL|       C| 1912-04-03|          1|  1|      001|   1|
|        645|       1|    3|Baclini, Miss. Eu...|female|  0|    2|    1|         2666| 19.2583|   NULL|       C| 1911-07-22|          1|  1|      001|   1|
|        173|       1|    3|Johnson, Miss. El...|female|  1|    1|    1|       347742| 11.1333|   NULL|       S| 1911-07-19|          1|  1|      001|   3|
|        382|       1|    3|"Nakid, Miss. Mar...|female|  1|    

In [32]:
# Apply a condition
df = df.withColumn('age_rank', when(col('Age') < 12, 'child').otherwise('not_child'))
df.orderBy(col('Age').desc()).show(10)

+-----------+--------+-----+--------------------+----+---+-----+-----+----------+-------+-----+--------+-----------+-----------+---+---------+----+---------+
|PassengerId|Survived|Class|                Name| Sex|Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|rank| age_rank|
+-----------+--------+-----+--------------------+----+---+-----+-----+----------+-------+-----+--------+-----------+-----------+---+---------+----+---------+
|        631|       1|    1|Barkworth, Mr. Al...|male| 80|    0|    0|     27042|   30.0|  A23|       S| 1832-09-14|          0|  1|      001| 577|not_child|
|        852|       0|    3| Svensson, Mr. Johan|male| 74|    0|    0|    347060|  7.775| NULL|       S| 1838-12-27|          0|  1|      001| 576|not_child|
|         97|       0|    1|Goldschmidt, Mr. ...|male| 71|    0|    0|  PC 17754|34.6542|   A5|       C| 1841-12-28|          0|  1|      001| 574|not_child|
|        494|       0|    1|Artagaveytia, Mr....|mal

In [33]:
# Create a pivot table
pivot_df = df.groupBy('Class').pivot('Sex').agg(sum('Survived'))
pivot_df.show(5)

+-----+------+----+
|Class|female|male|
+-----+------+----+
|    1|    91|  45|
|    3|    72|  47|
|    2|    70|  17|
+-----+------+----+



## Joins

In [34]:
ages=df.select(col('PassengerId'),col('Age'))
ages.show(10)
no_ages=df.drop('Age')
no_ages.show(10)
ages_not_null=ages.filter(col('Age').isNotNull())
ages_not_null.show(10)

+-----------+---+
|PassengerId|Age|
+-----------+---+
|        463| 47|
|        148|  9|
|        471| 29|
|        496| 29|
|        833| 29|
|        540| 22|
|        858| 51|
|        243| 29|
|        392| 21|
|        623| 20|
+-----------+---+
only showing top 10 rows

+-----------+--------+-----+--------------------+------+-----+-----+-------+-------+-------+--------+-----------+-----------+---+---------+----+--------+
|PassengerId|Survived|Class|                Name|   Sex|SibSp|Parch| Ticket|   Fare|  Cabin|Embarked|DateOfBirth|not_married|one|paddedOne|rank|age_rank|
+-----------+--------+-----+--------------------+------+-----+-----+-------+-------+-------+--------+-----------+-----------+---+---------+----+--------+
|        470|       1|    3|Baclini, Miss. He...|female|    2|    1|   2666|19.2583|   NULL|       C| 1912-04-03|          1|  1|      001|   1|   child|
|        645|       1|    3|Baclini, Miss. Eu...|female|    2|    1|   2666|19.2583|   NULL|       C| 1911

In [35]:
# pyspark syntax for joins
print(f'number of initial rows is {no_ages.count()}')
for type_of_join in ['inner', 'left', 'right']:
    join=ages.join(ages_not_null, ages['PassengerId']==ages_not_null['PassengerId'], how=type_of_join)
    print(f'number of rows for {type_of_join} is {join.count()}')

number of initial rows is 891
number of rows for inner is 891
number of rows for left is 891
number of rows for right is 891


## Dates formatting

In [36]:
# working with dates

# Define data
data = [('2019-12-25 13:30:00',)]
columns = ['Christmas']

# Create DataFrame directly from data
df_dates = spark.createDataFrame(data, columns)
df_dates.show()

+-------------------+
|          Christmas|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+



In [37]:
df_dates=df_dates.withColumn('date',to_date(col('Christmas'),'yyyy-MM-dd HH:mm:ss'))
df_dates=df_dates.withColumn('time_stamp',to_timestamp(col('Christmas'),'yyyy-MM-dd HH:mm:ss'))
df_dates.show()

+-------------------+----------+-------------------+
|          Christmas|      date|         time_stamp|
+-------------------+----------+-------------------+
|2019-12-25 13:30:00|2019-12-25|2019-12-25 13:30:00|
+-------------------+----------+-------------------+



In [38]:
# Define data
data = [('25/Dec/2019 13:30:00',)]
columns = ['Christmas']

# Create DataFrame directly from data
df_dates = spark.createDataFrame(data, columns)
df_dates.show()

+--------------------+
|           Christmas|
+--------------------+
|25/Dec/2019 13:30:00|
+--------------------+



In [39]:
df_dates=df_dates.withColumn('date',to_date(col('Christmas'),'dd/MMM/yyyy HH:mm:ss'))
df_dates=df_dates.withColumn('time_stamp',to_timestamp(col('Christmas'),'dd/MMM/yyyy HH:mm:ss'))
df_dates.show()

+--------------------+----------+-------------------+
|           Christmas|      date|         time_stamp|
+--------------------+----------+-------------------+
|25/Dec/2019 13:30:00|2019-12-25|2019-12-25 13:30:00|
+--------------------+----------+-------------------+



In [40]:
# Define data
data = [('12/25/2019 01:30:00 PM',)]
columns = ['Christmas']

# Create DataFrame directly from data
df_dates = spark.createDataFrame(data, columns)
df_dates.show(1, truncate=False)

+----------------------+
|Christmas             |
+----------------------+
|12/25/2019 01:30:00 PM|
+----------------------+



In [41]:
df_dates=df_dates.withColumn('date',to_date(col('Christmas'),'MM/dd/yyyy hh:mm:ss a'))
df_dates=df_dates.withColumn('time_stamp',to_timestamp(col('Christmas'),'MM/dd/yyyy hh:mm:ss a'))
df_dates.show()

+--------------------+----------+-------------------+
|           Christmas|      date|         time_stamp|
+--------------------+----------+-------------------+
|12/25/2019 01:30:...|2019-12-25|2019-12-25 13:30:00|
+--------------------+----------+-------------------+

