<div style="background-color:#F18A00; padding:10px;">
    <h1 style="color:white;"> Version Control and Git Flow Task</h1>
    
</div>


## Introduction to Version Control and Git Flow

Version control is a crucial aspect of software development, enabling teams to collaborate effectively on code, track changes, and manage project changes over time. Git allows multiple developers to work on a project simultaneously, ensuring that changes can be integrated smoothly and conflicts can be resolved efficiently.

In this task, we will be utilizing Git and GitHub to manage a pipeline. Between you, decide which ticket will be done by which members of the team, representing a task that needs to be completed. By following the Git flow, we will ensure that all changes are systematically reviewed and integrated into the main codebase.ase.

## Tickets!

Please complete the following tickets. Decide which tickets will be split between which team members and work as a team, incorporating the changes into the finalised code

- [x] Set up Git 🥳 
- [ ] Reintroduce the 'Final Outcome' column which was removed in the pipeline
- [ ] Delete longitude and latitude columns
- [ ] Change exported file type (export it as an excel file)
- [ ] Change 'Broad Outcome Category' column name to 'Finalised Outcome'
- [ ] Remove rows containing bike or car thefts within the crime type
- [ ] Split the 'LSOA Name' column into LSOA region and LSOA number (e.g. if 'LSOA Name' is "Basingstoke and Deane 001A" then the output should be 'LSOA Region: Basingstoke and Deane' and 'LSOA number:001A')
- [ ] Split 'Month' into month and year (e.g. 2022-01 would be 'Year: 2022 and Month: 01')


In [None]:
#farah has added this line

## Task: Make Changes to the Pipeline and Observe Alterations in the GitHub Repository

### 1. Complete Tickets
Choose the ticket that you will be working on. Implement changes to the pipeline below (please edit directly on the pipeline as these changes will be visible when uploaded to Git.

### 2. Push Changes to the Repository
Now that you have made the changes to the pipeline, push it onto a new branch using Git CMD on the repository that was created at the start of the lesson.

Save this file.

Remember to write detailed descriptions of the changes that you made to the pipeline to make sure that your team knows what has been altered.
Push your changes on a separate branch.

### Main Pipeline

In [5]:
import os
import logging
import pandas as pd

# Constants
LOCAL_DATA_PATH = './'
LOG_FILE = os.path.join(LOCAL_DATA_PATH, 'pipeline.log')
RAW_DATA_FILE = os.path.join(LOCAL_DATA_PATH, '2022-01-cheshire-street.csv')
OUTCOMES_DATA_FILE = os.path.join(LOCAL_DATA_PATH, '2022-01-cheshire-outcomes.csv')
STAGED_DATA_FILE = os.path.join(LOCAL_DATA_PATH, 'staged_cheshire_street.csv')
PRIMARY_DATA_FILE = os.path.join(LOCAL_DATA_PATH, 'primary_cheshire_street.csv')
REPORTING_DATA_FILE = os.path.join(LOCAL_DATA_PATH, 'reporting_cheshire_street.csv')


# Configure logging
logging.basicConfig(
    filename=LOG_FILE,
    filemode='a',
    format='%(asctime)s %(levelname)s:%(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    level=logging.INFO
)

def ingest_data(file_path):
    """
    Ingest raw data from a CSV file.
    """
    logging.info(f"Starting data ingestion from {file_path}")
    if not os.path.exists(file_path):
        logging.error(f"File not found: {file_path}")
        return None

    try:
        df = pd.read_csv(file_path)
        logging.info(f"Data ingestion from {file_path} completed successfully")
        return df
    except ValueError as e:
        logging.error(f"Error reading the CSV file {file_path}: {e}")
        return None

def merge_data(df, df_outcomes):
    """
    Merge the main data with outcomes data on 'Crime ID'.
    """
    return pd.merge(df, df_outcomes[['Crime ID', 'Outcome type']], how='left', on='Crime ID')

def finaloutcome(df):
    """
    Create 'Final Outcome' column based on 'Outcome type' and 'Last outcome category'.
    """
    df['Final Outcome'] = df.apply(
        lambda row: row['Outcome type'] if pd.notnull(row['Outcome type']) else row['Last outcome category'],
        axis=1
    )
    return df

def categorize_outcome(outcome):
    if outcome in ['Unable to prosecute suspect', 
                   'Investigation complete; no suspect identified', 
                   'Status update unavailable']:
        return 'No Further Action'
    elif outcome in ['Local resolution', 
                     'Offender given a caution', 
                     'Action to be taken by another organisation', 
                     'Awaiting court outcome']:
        return 'Non-criminal Outcome'
    elif outcome in ['Further investigation is not in the public interest', 
                     'Further action is not in the public interest', 
                     'Formal action is not in the public interest']:
        return 'Public Interest Consideration'
    else:
        return 'Unknown'  # Or any other category for unknown outcomes

def apply_categorization(df):
    """
    Apply categorization to 'Final Outcome' column.
    """
    df['Broad Outcome Category'] = df['Final Outcome'].apply(categorize_outcome)
    return df

def del_values_street(df):
    """
    Delete unnecessary columns from the DataFrame.
    """
    cols_to_delete = ['Reported by', 'Context', 'Location', 'Last outcome category', 'Outcome type', 'Final Outcome']
    df.drop(columns=cols_to_delete, inplace=True)
    return df

def stage_data(df, df_outcomes, output_file):
    """
    Store the data to a CSV file for staging.
    """
    logging.info("Starting data staging")
    try:
        # Apply transformations
        df = merge_data(df, df_outcomes)
        df = finaloutcome(df)
        df = apply_categorization(df)
        df = del_values_street(df)

        # Save to CSV
        df.to_csv(output_file, index=False)
        logging.info("Data staging completed successfully")
    except Exception as e:
        logging.error(f"Error during data staging: {e}")

def primary_transformations(df):
    """
    Primary Storage Layer: Apply primary transformations to the data.
    """
    # Example transformation: Convert some columns to categorical data type
    df['Crime type'] = df['Crime type'].astype('category')
    df['Broad Outcome Category'] = df['Broad Outcome Category'].astype('category')

    # Example transformation: Create a new column by summing existing columns
    if 'Latitude' in df.columns and 'Longitude' in df.columns:
        df['Location Sum'] = df['Latitude'] + df['Longitude']

    return df

def primary_data(df, output_file):
    """
    Primary Storage Layer: Store the primary transformed data to a CSV file.
    """
    logging.info("Starting primary data transformation")
    try:
        # Apply primary transformations
        df = primary_transformations(df)

        # Save to CSV
        df.to_csv(output_file, index=False)
        logging.info("Primary data transformation completed successfully")
    except Exception as e:
        logging.error(f"Error during primary data transformation: {e}")

def reporting_aggregation(df):
    """
    Reporting Layer: Aggregate data for reporting purposes.
    """
    # Example aggregation: Count of crimes by crime type and broad outcome category
    agg_df = df.groupby(['Crime type', 'Broad Outcome Category']).size().reset_index(name='Count')

    return agg_df

def reporting_data(df, output_file):
    """
    Reporting Layer: Store the aggregated reporting data to a CSV file.
    """
    logging.info("Starting reporting data aggregation")
    try:
        # Apply aggregation
        agg_df = reporting_aggregation(df)

        # Save to CSV
        agg_df.to_csv(output_file, index=False)
        logging.info("Reporting data aggregation completed successfully")
    except Exception as e:
        logging.error(f"Error during reporting data aggregation: {e}")

def main():
    logging.info("Pipeline execution started")
    try:
        df = ingest_data(RAW_DATA_FILE)
        df_outcomes = ingest_data(OUTCOMES_DATA_FILE)
        
        if df is not None and df_outcomes is not None:
            stage_data(df, df_outcomes, STAGED_DATA_FILE)
            df_staged = ingest_data(STAGED_DATA_FILE)  # Read the staged data for further processing
            primary_data(df_staged, PRIMARY_DATA_FILE)
            df_primary = ingest_data(PRIMARY_DATA_FILE)  # Read the primary data for reporting
            reporting_data(df_primary, REPORTING_DATA_FILE)
        logging.info("Pipeline execution completed successfully")
    except Exception as e:
        logging.critical(f"Pipeline execution failed: {e}")

if __name__ == "__main__":
    main()
