# Non Res PIR linear regression model

### Linear regression model to convert non res PIR people figures to Filled post estimates

In [24]:
import polars as pl
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
# from sklearn.feature_extraction import DictVectorizer
import matplotlib.pyplot as plt
import pickle
import io
import boto3

Specify the bucket and the source path of the initial data.

In [2]:
BUCKET = 'sfc-mt-sagemaker-demo'                                                                                    #used for demo purposes 
DATA = 's3://sfc-main-datasets/domain=ind_cqc_filled_posts/dataset=ind_cqc_estimated_missing_ascwds_filled_posts/'

### Read AWS Glue Schema

In [27]:
from utilities.schema_reader import GlueSchemaReader
gsr = GlueSchemaReader("main-data-engineering-database")

ModuleNotFoundError: No module named 'utilities'

In [None]:
plschema = gsr.get_polars_schema("dataset_ind_cqc_estimated_missing_ascwds_filled_posts")

### Read data from S3 (use schema when sorted) 

When reading the data with ```scan_parquet``` it reads it as a [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html) which you can filter.

Ahead of using the data in a model you will need to ```collect()``` the data as shown further below. 

In [3]:
df1 = pl.scan_parquet(DATA,schema=plschema)

In [None]:
df1.schema

Filtering dataset to required data using Polars syntax.

In [4]:
df1_selected = df1.select("locationId", "cqc_location_import_date", "careHome", "ascwds_filled_posts_deduplicated_clean" ,"pir_people_directly_employed_deduplicated")
df1_filtered = df1_selected.filter(pl.col("careHome") == "N")
df1_filtered2 = df1_filtered.filter(
    (pl.col("ascwds_filled_posts_deduplicated_clean").is_not_null()) 
    & (pl.col("ascwds_filled_posts_deduplicated_clean") > 0) 
    & (pl.col("pir_people_directly_employed_deduplicated").is_not_null())
    & (pl.col("pir_people_directly_employed_deduplicated") > 0)
)
df1_filtered3 = df1_filtered2.with_columns(
    (pl.col("ascwds_filled_posts_deduplicated_clean") - pl.col("pir_people_directly_employed_deduplicated")).abs().alias("abs_resid"))

Test Dataset matches example model 

In [None]:
# Test a date for 28,000 rows 
dftest = df1.filter(pl.col("import_date") == 20250301)
dftest.collect().shape 

In [5]:
df1_filtered3.sort(pl.col("abs_resid"), descending=True).collect().head(10)

# shows same data and EMR noteboook

locationId,cqc_location_import_date,careHome,ascwds_filled_posts_deduplicated_clean,pir_people_directly_employed_deduplicated,abs_resid
str,date,str,f64,i32,f64
"""1-133394328""",2023-04-01,"""N""",548.0,82,466.0
"""1-8607670975""",2023-10-01,"""N""",567.0,135,432.0
"""1-4776578785""",2023-08-01,"""N""",998.0,612,386.0
"""1-3679714454""",2025-01-01,"""N""",973.0,615,358.0
"""1-12291638633""",2023-01-01,"""N""",374.0,23,351.0
"""1-6957740561""",2022-07-01,"""N""",460.0,125,335.0
"""1-1678280100""",2024-12-01,"""N""",730.0,422,308.0
"""1-2470563606""",2025-01-01,"""N""",46.0,314,268.0
"""1-2013534526""",2024-10-08,"""N""",330.0,99,231.0
"""1-6957740561""",2024-03-01,"""N""",340.5,113,227.5


Create filtered and excluded datasets

In [6]:
filtered_df = df1_filtered3.filter(pl.col("abs_resid") <= 500).drop("abs_resid")
excluded_df = df1_filtered3.filter(pl.col("abs_resid") > 500).drop("abs_resid")

Previously within EMR, Pyspark required vectorising of the data for modelling.

As the dataset is a small enough this is not required in scikitlearn ahead of linear regression but if needed for more complicated models in the future Please seee below the syntax required. 

In [None]:
# vectorised_features_df = DictVectorizer()     Does what the VectorAssembler() did in pyspark - required for pyspark model 

## Separate into training and test sets

In [None]:
TRAIN_FRAC = 0.8
df_eager = filtered_df.lazy().collect()                                                          #this converts the lazyframe to a dataframe (this is needed for the modeling)
df_train1 = df_eager.sample(fraction=TRAIN_FRAC, with_replacement=False, shuffle=True, seed=55)
df_test1 = df_eager.join(df_train1, on=df_eager.columns, how='anti')

# Go through and explain each part of the code and what it does 

## Training
1. Put the data into numpy arrays for use by scikit-learn
2. Create and fit the model
3. Get the predicted values from the model

In [None]:
x1 = df_train1['pir_people_directly_employed_deduplicated'].to_numpy().reshape(-1,1)   #input feature (Reshapes the 1D array into a 2D column vector, which is required by sklearn.)
y1 = df_train1['ascwds_filled_posts_deduplicated_clean'].to_numpy()                    #target variable
model1 = LinearRegression()
model1.fit(x1, y1)                                                                     #fits the linear regression model
y_pred1 = model1.predict(x1)                                                           #predicts filled posts based on people

Scikitlearn offers a wide range of [Linear models](https://scikit-learn.org/stable/modules/linear_model.html) that can be imported for more complicated models. 

An example of others you used in this simple model is [Polynomial Regression](https://scikit-learn.org/stable/modules/linear_model.html#polynomial-regression-extending-linear-models-with-basis-functions).

In [None]:
fig, ax = plt.subplots(figsize=(16, 8))
ax.scatter(x1, y1, alpha=0.5, s=10, label='pir_people_directly_employed_deduplicated')
ax.plot(x1, y_pred1, color='red', linewidth=2, label='Linear Regression')
# ax.set_title('')
ax.set_xlabel('pir_people_directly_employed_deduplicated')
ax.set_ylabel('ascwds_filled_posts_deduplicated_clean')
ax.ticklabel_format(style='plain', axis='y')
ax.grid(True, linestyle=':', alpha=0.7)
ax.legend()
plt.tight_layout()
plt.show()

# ask gary to take through each of the regressions he used - I can add this but not a priority 
# find list of each of these within 

### Verify that the R^{2} is good

Specify the bucket and the source path of the initial data.

In [None]:
r2_1 = r2_score(y1, y_pred1)

print(f'The model has an R2 score {r2_1}')

## Check model against test set

In [None]:
x_test1 = df_test1['pir_people_directly_employed_deduplicated'].to_numpy().reshape(-1,1)
y_test1 = df_test1['ascwds_filled_posts_deduplicated_clean'].to_numpy()
y_test_pred1 = model1.predict(x_test1)
r2_test1 = r2_score(y_test1, y_test_pred1)
print(f'The model scores {r2_test1} on test data')

Extract the model parameters if desired.

In [None]:
slope = model1.coef_[0]
intercept = model1.intercept_
print(f'The model slope is {slope}, and the intercept is {intercept}')

## Save the model to S3

In [None]:
s3 = boto3.client('s3')
try:
    s3.delete_object(Bucket='sfc-mt-sagemaker-demo', Key='params/v1/params.pkl')
    print("Deleted params/v1/params.pkl from sfc-mt-sagemaker-demo")
    s3.delete_object(Bucket='sfc-mt-sagemaker-demo', Key='params/v2/params.pkl')
    print("Deleted params/v2/params.pkl from sfc-mt-sagemaker-demo")
except Exception as e:
    print(e)
    print('Deletion failed, possibly no files')

We will serialise the model using the built-in [pickle](https://docs.python.org/3/library/pickle.html) module. For a simple linear model, it is possible just to save the model parameters in plain text, but it is better practice to save the full model instance for re-use.

In [None]:
BUCKET = 'sfc-mt-sagemaker-demo'

buffer = io.BytesIO()                                     # creates in memory binary stream 
pickle.dump(model1, buffer)
buffer.seek(0)                                            # moves the 'cursor' to the start of the buffer 
s3.upload_fileobj(buffer, BUCKET, 'params/v1/params.pkl') 

## New data

Now we imagine that some time has passed and new data is available. First the analyst reads in the model that has been saved to S3. Then s/he reads in the new data and compares it to the model predictions. If these turn out to be a poor fit, the training/testing/serialisation cycle is restarted.

## Retrieve the serialised model

In [None]:
download = io.BytesIO()
s3.download_fileobj(BUCKET, 'params/v1/params.pkl', download)
download.seek(0)
loaded_model = pickle.load(download)
loaded_model.coef_[0], loaded_model.intercept_

## Retrieve the latest data

- Set out the new data location within the S3 bucket.
- Use ```scan_parquet()``` to pull the data in as a Lazyframe, ```collect()``` data to convert to eager dataframe in order to compare the model predicitons to the new data.
- Plot the data as required/ as above.
- Repeat Training, Testing and Serialisation.
- Save to S3 as new version. 

# Test Result of Polars vs Pyspark

Test that the model using Polars produces the same model (intercept and gradient) as Pyspark

### As within Non Res PIR linear regression model 

Using Polars:
- Test using whole dataset in order to compare.
- No randomness to the data used.

In [7]:
TRAIN_FRAC_TEST = 1

df_eager1 = filtered_df.lazy().collect()
polars_test = df_eager1.sample(fraction=TRAIN_FRAC_TEST) # no need to input randomness in the dataset
df_test = df_eager1.join(polars_test, on=df_eager1.columns, how='anti')

In [8]:
x1 = polars_test['pir_people_directly_employed_deduplicated'].to_numpy().reshape(-1,1)   #input feature (Reshapes the 1D array into a 2D column vector, which is required by sklearn.)
y1 = polars_test['ascwds_filled_posts_deduplicated_clean'].to_numpy()                    #target variable
modeltest = LinearRegression()
modeltest.fit(x1, y1)                                                                     #fits the linear regression model
y_predtest = modeltest.predict(x1)                                                        #predicts filled posts based on people

In [10]:
slope = modeltest.coef_[0]
intercept = modeltest.intercept_
print(f'The model slope is {slope}, and the intercept is {intercept}')

The model slope is 1.057218026189747, and the intercept is 5.174214629366368


Using PySpark:
- This has been carried out within EMR to test and save this result.
- This is because EMR supports Pyspark as spark needs clusters.
- Again, test using the whole dataset in order to be able to compare.
- No Randomness to the dataset.